1: Business Background：
1.1 Typical Business Background:
Alibaba has the world’s largest e-commerce platform Taobao. Hundreds of millions of users shop on Taobao everyday. Improving the experience of our user is top priority. so in this blog we will give you an example on how we do that with search and recommendation.
When a user does a search on Taobao he usually looks at the enlisted. and the goal of search is to help our user find the right product with the disabled but what his right is different from one customer to another customer so it’s pretty subjective matter. To get everybody satisfied we have to understand our users and the products better and we will use machine learning to do that. when a user shops on taobao he interacts with our system and he looks at the product, click a product or even purchase a product may become events and they get captured and eventually ingested into query system such as a Kafka. The idea is that we can use machine learning techniques to process events in such event queue to gain insights about our customers and our products. Because search is a very inactive experience. Our customer does a search, look at the result, click on fields, such result may come back, scope, may even do more searches. We want to make sure that the insight which get from users behavior can be immediately used to optimize his search experience. So this having a very strict latency requirement. Flink is well-suited for this job. Flink is a true streaming processor which means it can provide a high throughput with extreme low latency. It can achieve a latency of seconds or even sub seconds.
The first step in machine learning is pre-processing. The events that’s cracked it into our event queue usually have incomplete information. For example when a user clicks on a product only the user ID and Product ID is captured in the event log, but such IDs do not really be much to gain a full picture of what really happened. We need to join such events, look-up such events in entity tables such as user profile table and the product tables, because we are doing such joining in a streaming fashion. We need to store such entity tables in a system which is really good at point lookups and we decided to use Hbase for that. So flink does the streaming joining and between the event stream and entity tables, such as the user profile and product tables then we get a better picture or full picture of what happened and we do follow cleanup of such events then we try to calculate real time features. This real-time features can include the scenes such as the inventory of product or we may even decide to do aggregation skills such as calculate the number of clicks for a product in a certain period. Such real-time features gets pushed to our online search and recommendation systems and are used to optimize the ranking and the experience for Taobao users.This is part of the top prime.
Another part of the top prime is that we also do online training. So in this case we are going to treat any impression events as negative samples but a click event or purchase event can be treated as positive example. So with Flink you can use any of your favorite online learning machine algorithms and change your models online when we validate such online models from time to time periodically. Once the model passes validation it gets pushed to our search and recommendation systems and can be used to gain even better experience for customers. In some cases the online training itself can bring very good result.
For example on single’s day which is the Chinese version of Black Friday. We observed a 20% jump in sales from search simply by using this online model. So the difference can be dramatic. So this whole top prime looks pretty straight forward with the flink we can do real time machine learning but reward is always the slightly more complex. Because we cannot expect our AI engineer to get things right in one shot, it’s unlikely we will find them a perfect model with just one try and it’s going to be a lot of iterations.
1.2 More Complex Structure
Let’s take a look on what it takes to do such iterations this is rejoining of the previous architecture diagram again subscribes to live interaction the behavior events from Kafka and it will joining such event strings with entity tables such as user profile or product tables which are installed in systems like HBase or Cassandra. then Flink does calculation real time features as well as a chain real time models. This real time features and models gets pushed to our online search and recommendation systems to optimize experience of ranking. When our AI engineer pushed the model into production he wants to find out if the model is performing better than existing models. For that he needs to do A/B testing and he wants to see the result how his model in performing at real time and have a real-time dashboard for such performance. Flink is again may suited to do that. Flink will correct all the metrics for each model and the save such result in HBase.Then our AI engineer can use dashboards to create HBase to see how each model is performing. He looks at the dashboard and figures out that his model is not performing exactly as he wished something is not perfect. So he wants to figure out what went wrong and try to tune his model. For that he will need to do a lot data crunching and a lot of analysis if he tries to do such analysis with a system like HBase or Cassandra. It’s going to be extremely inefficient and slow. So we decided to store such kind of data in a current store such as Clickhouse or Druid. With these system our AI engineer is able to send analytic queries to to clickhouse and druid to and expect the answer to come back very quickly. so he plays with it and do all kinds of analysis using the real-time reports and eventually figured out what went wrong with his model and came up with potential solution. So he decided to try out his new model he can wait for new samples to come but a more efficient approach would be using historical data.
We can also save the historical data in Hive and then our AI engineer can use batch process into the pre-processing and use offline training to validate his new models.
As we can see here we started to store data in multiple storage systems whenever this happens sooner or later, you will need to run a query that joins table and data from multiple systems.
So we need federated query support and we use Drill and Presto of that purpose. This can also be used to do batch acceleration (speed up query performance for data that installed in Hive)
But no matter how good your acceleration is, there will be a query that’s too complex to calculate on a fly. For that we decided to run such an expensive queries periodically. For example like once every five minutes and store results in a cache existing like Redis and Mysql. Then when our AI engineer need to query in just queried caching standard get the result out immediately this can speed up the iterations as we can see here even though the original real time machine learning pattern study the pretty simple and straightforward but if we really want to push such a pipeline into production and make it efficient.
1.3 Typical Big Data Lambda Structure
The architecture becomes more and more complex very quickly. This is not a coincidence we have observed this same pattern over and over again whenever you try to build a real-time data pipeline. They don’t happen sooner or later it will become something like this. The question is where does this complicity come from and is there a better way in doing this. so let’s take a step back and look at what is the fundamental reason for this?
The first thing to build a scalable data warehouse is batch processing. Batch process all the data and produce a complete result. The downside batch processing is that your application is making decisions based on data which are hours old or even days old. Run the architecture proposed speed layer to fix this problem. A speed layer where process new events in a streaming fashion and produce incremental updates to the results that is produced by the batch processing. Our downstream applications can combine the result from batch processing and speed the processing together final result but result produced by batch layer may not be efficiently queryable by our applications, so lambda architecture further proposed serving layer which will organize the data in a way that can be efficiently queried , so our downstream applications will combine data from the serving layer as well as the speed layer to get a final view of the current data.
There are two potential problems in this architecture:
The first problem is that we are using a different system for batch layer and speed layer. This can reduce a lot of complicity in our system. Great achievement of flink that it’s the unified processor which is good at both batch and stream processing. So we can use the single system and flink will guarantee that the semantics of batch and stream processing are compatible with each other .
The second problem in this architecture is that the data is stored potentially different systems. The result from batch processing installed in serving layer and the result of stream processing installed in speed layer.This can also cause complicity in the system
1.4 Typical Big Data Structure Shortage
Let’s take a look what are the all the possible choices to store our data. So if you look at the open source world there are actually lots of choices in storing data for example we can use Hive to store offline data， HBase and Cassandra are very good at storing real-time data and the provide efficient point look-up query abilities， and on the other hand Druid and Clickhouse are good at serving analytic queries while the choices are always good. But in this case it’s actually indicating that none of the choices above is good enough to meet our requirements from lambda architecture.
As a result our application developers are forced to use multiple subsystems and when this happens we will have data islands：
- The data may need to be duplicated between different assistance and this will increase hardware cost.
- In addition it will always be a challenge to maintain the consistency of data among different systems which will also increase the maintenance cost.
- When you are dealing with multiple systems, each system will have its own pitfalls and you will need to learn through such pitfalls and it can be a steep learning curve, so we are hoping that we can have a single storage system that can do these.
1.5 Simplified Big Data Structure
So ideally we have an architecture just like this, we will use a unified processor which does both stream and batch processing, Flink is such a processor. It can do both batch and stream processing efficiently then the output from this unified the data processor is going to be stored in this unified storage system which solves downstream applications.
2: “Perfect” Data Lake Structure
Over the last couple of years there has been great advancement in this direction from various data lake solutions. a data rate solution usually stores data in a cloud storage such as A3 on AWS or OSS. The data solution can store raw ingested data and on top of that we can do incremental ETL and produce other tables. All such tables can be queried by query engines and serve downstream applications.
This is going to simplify your architecture a lot but there are two areas for implement in this approach:
The first area is that the data in current data solutions are not always fresh they have a latency of minutes or even more. This is probably the best thing you can do if your oxygen data processor is micro-batch based but in our case we use fink as our data processor and flink is the string first data processor not making a data immediately available to downstream applications would be a missed opportunity.
The second area for improvement is that in current data like solutions the access pattern is really geared toward analytic workloads but as we see in our real-time machine learning top prime, point look-up queries are as important as analytic queries. We need to use it to serve downstream applications as well as do steaming joints in flink.
3: New Concept HASP (Hybrid Serving/Analytical Processing)
3.1 What is HSAP
To address both former problems, we are proposing a new paradigm. We are going to propose this is called hybrid serving analytic processing.
In such a system:
- It will provide a unified data storage for data produced by both batch processor and the stream processors.
- It is capable ingesting real-time data at extreme high rate could be as high as tens of millions of events a second.
- It will make data available to downs applications immediately without any delay. So the data is always fresh.
- Offer a unified data storage for all the important real-time data access patterns.
Data Access Patterns:
On the bottom of this slide it shown a classification of different data access patterns. On the right most we have a transnational workload which has the most stringent requirement in terms of latency and the SLA. When we have lost serving workloads which includes simple queries such as the point look-up queries this workload really have a very high QPS requirement and also strict deference the latency could be as low as tens of milliseconds or even milliseconds. Then we have analytic workloads whose QPS might be a bit over but the queries are much more complex and our users still expect the query to finish within seconds or even sub seconds. On the leftmost we have a patchwork roads which does not have a strict expectation in terms of latency but does need to process vast amount of data. so HSAP will cover data access padding in the middle. All the real-time needs for serving analytic query patterns are going to be supported efficiently with HSAP system.
3.2 Big Data Structure Based on HASP
With the HSAP system in place our architecture will now look like this again we use a unified data processor for both batch and the streaming jobs then all output from these jobs we are going to be stored in this HSAP system. The HSAP system serves all downstream applications, for example it can support real-time dashboard support real-time records and the other real-time applications. But one interesting thing about the HSAP that because it’s good at analytic workloads as well as a serving workloads flink can stall and it enables seeing each subsystem as well and use HSAP system to do streaming joins.
3.3 How we are going to build such a HSAP system
When you start building a new system the first thing you need to consider is what’s the interface to your users and we want learning curve for our users to be as smooth as possible. so we decided to leverage existing ecosystem and in this case we choose postgres. Postgres has a very complete support for all the functionalities that we need and it also is very popular in this domain. By being compatible with the postgres we gain a lot of value for example:
- Your tools that is available for postgres can be used in our system.
- You can use JDBC driver for postgres to connect to our server.
- Our protocol is compatible with postgres. You can use tools such as PG admin from postgres to connect to our server, query our server and manage our server.
- You can use your even use your favorite BI tool such as tableau to do complex BI analytics with system.
Another big advantage of being compatible with Postgres sees that all the documentation is out there about postgres can be used for our system as well. For example if you ever wonder how to express some functionality in SQL, you can do a search on the internet the chances are you will be able to find your answer. Another big advantage of consequence is that it has a lot of third-party extensions these extensions can extend the functionality of Postgres itself because we are compatible with postgres it makes it possible for us to extend the functionality of our servers easier as well.
4: New Generation Real-Time Data Interactive Engines Hologres
The system we are building the name of system we are building is Hologres. I just explained postgres part. The holo stands for holographic and let me spend a couple words on this. A black hole have an event horizon for any objects that enters the event horizon, the object can never come out again. It’s lost in a black hole forever. Imagine that you are throwing a book into a black hole means once the book cross the event horizon it can never come out. So the question is what happens to the information that is contained in your book? Is that lost forever? That appears to be the case because you can never get a book out but this is problematic it violates one of the fundamental principles in quantum physics. In quantum physics information is the conservative authority just like energy. It cannot be destroyed or created. So the information of the book is lost when you throw into into a black hole, it could be problematic. And this question has a positive it exists for many years and the eventually they come out with a potential solution. The idea is that the world is holographic which means even though the world looks three-dimensional but if you look at the surface the boundary of this three dimensional space, the boundary actually includes all information in it. If you look at the hologram a hologram is a special type of two-dimensional film, but it actually incurs all the information about the three-dimensional object. If you look at a hologram from different angles, it can show you the three-dimensional object from different perspectives. In this case the boundary of black hole which is its horizon this two-dimensional surface will contain in code all the information that is contained in a black hole itself which includes all the information inside of the book that we throw into this black hole.
4.1 Hologres Structure
In our case you can through all your data into Hologres. Hologres will be able to store all your data but at the same time you can retrieve any information you want from hologres from whatever perspective you want.
So now let me explain the architecture of hologres and the challenges we have in building hologres. This is overall architecture diagrams for Hologres and your BI tools your applications can connect Hologres using PostgreSQL protocol it issues sql which is compatible with Postgres and that request gates into a load barrance which directs which choose one of the front end servers we have this front-end solo will compile a query and optimize recurring and generates a query plan has a lot of query fragments. These query fragments gets distributed into a set of black hole servers.The block hole servers does the heavy lifting of doing a query processing, and the data are stored in a distributed storage such as a Pangu or HDFS.
4.2 Hologres: Cloud Native
The first design decision we made is we are going to build a coordinating system. We are going to use storage computer dis-aggregation architecture. Over the last couple of years there has been tremendous advancement in terms of network technology. Can u gigabyte network has become commodity we start to have 25 gigabyte, 50 gigabyte or even 100 gigabyte network now. So throughput is largely not an issue it’s possible to store the data in a different machine from where you do the computation now.
Separation of Storage and Compute:
This architecture gives us two main advantages:
The first advantage is that when you are running out of the storage space where you need to add more storage. You can simply add more storage node and if you are running out to computer capabilities you can simply add more compute node. This is especially convenient in a cloud environment where resources is very elastic you only pay for the resources as you really need.
The second advantage in this approach is that it makes the operations much easier. For example imagine that in the old days if you want to add more computation node you will need to add a new machine but before it can serve any traffic you need to migrate data to this new node. Data migrations are always complex and slow. but in our new approach, when we add anew computation node it will be able to solve online traffic immediately.
Even though the advancement of network technology has largely solved the throughput issue but accessing data from a remote system will always have some latency. If we not do it carefully, latency can quickly become bottleneck for throughput of remote storage. To solve this issue we decided to use a completely asynchronous framework to hide the latency of remote data access. with this approach the slope of our system is only determined by the throughput of remote storage itself, it’s not going to be sensitive to latency from accessing remote storage.
Modern hardware also have lots of processors to make use of such processors. People use threads and we will need to write threads safe programs and the common practice is to use locking. Whenever you introduce lock it adds a contention to your system and your system is not going to scale efficiently because you are wasting a lot of CPU cycles spinning unlocks. We adopted a collaborative scheduling mechanism. With such a scheduling mechanism, we can conclude that a piece of a code can never be switched out. So we actually do not need to take a knock at all and still be this way to save this can greatly improve improve the scalability of our system.
More than hardware also will have lots of memory if being able to efficiently use the memory the critical part in having a performer storage we have very sophisticated memory and the cache management system which makes the best use of our memory.
4.3 Query Processing or Compute Part for Hologres
As HSAP system hologres need to serve a very diverse set of real-time workloads. These workers include complex analytic queries as well as simple serving queries, when this happens we do not want the peak analytic queries to block the processing of these of such low latency point look-up queries. So we introduced our own custom schedule to guarantee the SLA for individual creates to make sure our point look-up queries can still retain within milliseconds even when the system is loaded with a lot of peak analytic queries. On the other hand if system is only having a single query we still want to make sure the single query can use all hardware resources in this cluster to finish the computation as as soon as possible. Fortunately the cluvity scheduling mechanism and the asynchronous of the framework that we put in place as well as to launch massive parallelism with very little overhead efficiently. A single query in our system make for usage of all hardware resources in our cluster. Our query energy is also very efficient in leveraging processors we use vectorization technology we make use of city instructions may optimize for every x5 tiles instruction set. Our query has a very deep understanding of how data is installed in storage system so we do optimizations it for such data storage and as a result we see very good improvements in terms of query performance. Both of our query engine and the storage engine is developed in pure simple fast native code and as a result we get very good performance. For example for point look-up queries for service scenario 4.0 countries we outperform HBase by an order of magnitude and for complex analytic queries we outperform Greenplum by 3x.
4.4 Real-Time Data Warehouse: Flink+Hologres
Let’s take a look how our system looks like with this new architecture again we will use a flink to do our real time machine learning as well as offline training. The difference is that now we can store all the data in a single system hologres. Hologres is store to store enter tables such as user profile and product. It can also store all the samples that is produced by flink. It stores the metrics from A/B testing so our AI engineer can use real-time dashboard to issue point queries against the Hologres to retrieve how his model is performing. He can use real-time reports even a BI tool such as to issue analytic queries against hologres to understand why his model performing in certain way to gain insight about his model and if he wants to try out different model he can talk with Hologres to get the data and to do offline training and to try out his model. Furthermore the hologres is also used by flink to do streaming join because it’s very efficient at doing point lookups and so you can see with hologres the architecture of system get simplified a lot. It will make the iterations of AI engineers much more efficient and hologres is not only serving internal customers at Alibaba but it’s also a variable publicly on Alicloud.
In this talk we proposed the new architecture HSAP system it will greatly simplify your application development. At the same time HSAP system is very good at ingesting data at extremely high throughput. It will present fresh data without sacrificing performance and scalability when your business makes decisions based on real-time data you are getting more values of your data. So this can boost the performance of your business just as we did with Taobao. At the same time the ground-up new design and sophisticated optimization that we did in hologres will help your business with using your TCO. So in short with flink and hologres you can make your business real-time without any compromises.
Speech Record: https://www.youtube.com/watch?v=TPmnw6sWgMU&feature=emb_logo