Design Patterns for Calculating User Profiles in Real Time

Knowing your users’ preferences and behavior could help in optimizing the user’s E-journey by improving search relevancy and presenting relevant products. One way to leverage those preferences is to calculate a passive user profile based on the user’s interactions such as ad views and searches.

At mobile.de, Germany's largest online vehicle marketplace, the user profile contains preferences for vehicle attributes, such as color, price, and mileage. These are represented as likelihood distributions, that allow us to infer how likely a user will e.g. view or “favourite” cars with certain features. The user profile is calculated and updated constantly in real time in order to reflect the user actions on his E-journey almost immediately. The challenges in designing such system are big and I would like to share some of the possible tradeoffs.

In a nutshell, updating user profile in real time is actually a stateful stream processing system in which the state is the user profile, the state key could be the user ID, and the state update operation can be simple as counter increment or an average/variance update.

One of the most important choices in the design of such a system, and the main focus of that blog, is how to store the stream state?

There are basically two major options, local storage and an external (global to the workers) storage. Most of the stream processing frameworks are designed to process stream of events in a distributed fashion by starting multiple workers (executers) on multiple nodes. Each worker processes a subset of the events and could store a fraction of the stream state inside an embedded data storage which could be accessed only by that worker.

Typical choices of local storage could be a simple in memory key-value data structure such as dictionary or an embedded key-value data storage. The stream state can be then queried by exposing a REST endpoint on each worker. The main benefits of using local storage for state management are the needless to maintain an external data storage, and extremely low latencies while updating and querying the state, resulting from data locality and the lake of network bottlenecks typical to external storage. On the other hand, scaling out the local storage to fit an increasing state size can be quite challenging, as the local storage bounded to the available space on the node “hosting” the worker. Recovering from failures could be quite slow in the case of a really large state and in some cases it might even produce duplications (for example by reading from Kafka same messages again since those were not committed due to the failure). One more important point to consider is whether the state must be always queryable even in the case of worker failure. If that is one of the main business requirements, then it is better to avoid the local storage option since any failure of a specific worker would cause to a subset of the state to be unavailable.

The second option for storing the state is an external storage. Some of the popular choices here are Cassandra and Hbase. External storage has the advantage of more resiliency to failures, stronger high-availability, and a more robust recovery mechanism. Additionally, it can be scaled out and adjusted much better to handle an increasing requests rate (querying the state) and growing state size. Furthermore, guaranteeing that the state is always queryable could be achieved by simply deploying a REST service on top of the external storage. In case some of the stream processing workers are down, the state would be still queryable via the REST service. On the other hand, updating and querying stream state in an external storage would introduce higher latencies, mostly resulting from network bottlenecks, in addition to possible race conditions scenarios which are the price of the external storage being accessible by all the workers at the same time.

Race conditions in a distributed system can lead easily to an incorrect state as illustrated in the following example: worker-1 is processing an ad view event of user X with a blue car and exactly at the same time worker-2 is processing another ad view event also of user X but with a red car. The current state (user profile) in the external storage for user X is {color-blue: 2 views}. The sequence of operation, performed by each worker, would be then as following: read the current state of user X from the external storage, then apply the state update operation (simple counter increment) locally, and finally write the new state back to the external storage. Eventually worker-1 would try to write {color-blue: 3 views} whereas worker-2 {color-blue: 2 views, color-red: 1 views}. Which of the writes would win the race isn’t deterministic and either way the final state for user X would be incorrect.
How to escape those race conditions scenarios is an interesting topic by itself, however a deep dive into that topic is already beyond the scope of this blog.

One thing though that is worth to mention is that there are two major approaches for the possible workarounds. Either to design the stream processing system in a way that all the events of a specific user would be processed always sequentially by a single worker, either to have a complete change of mind and design the system as a stateless one by persisting to the external storage only the state delta changes.

At mobile.de we calculate user profiles on a daily base, and we store each daily user profile for a period of 60 days. The main motivation behind that decision is to allow us provide the various stakeholders with a possibility to decide based on how many days they need the user profile, and as well the option to apply different decay functions on the profile days in order to boost more recent data. We generate each day approximately 2 million user profiles and each profile have 20 dimensions (profile dimensions are based on car attributes and could be for example price and color). In order to get a specific profile dimension, the following key combination, which is also the state key, is used: (user ID, date, dimension).

2M (daily profiles) * 20 (profile dimensions) * 60 (days) = 2.4 Billion keys, which is quite a big state. The requirements that we had for the user-profile product were quite clear:

The user profile should be always served even if the stream processing system, which updates the user profiles in real-time, is down. We prefer to serve not the most updated user profile rather than not serving it at all.

The serving layer must be able to scale out with the increase in the requests rate.
The state storage must be able to scale out and grow with the increase in the state size.
Additional CPU intensive calculations and transformations are required as one step before serving the final user profile result.

The user profile need to be updated “just-in-time”, meaning that even if there are some latencies and the user profile is updated with some delay, for example 30 seconds after that the corresponding user interaction event triggered, it is completely tolerable.
All that made our mind to store the stream state in an external storage. We have selected Cassandra as the external storage and Akka-http to implement the REST service on top of it.

To conclude, choosing how to store the stream state is one of the most important choices in the design of a stateful stream processing system. The right choice should be made only after thoughtful review of the technical pros and cons of each possible option and more importantly after deeply analyzing and understanding the business requirements.

Photo: cc-by-sa 3.0 Gregor Fischer

About the author

Berlin Buzzwords Speaker Igor MazorIgor Mazor is a Senior Data Engineer at mobile.de, Germany's largest online vehicle marketplace. Since joining the company in 2016, he has been pioneering the implementation of scalable micro-services around data products. Igor’s current focus is on designing and building a real-time infrastructure that can support millions of users per day.