来源: 2623

本文作者:Jay Kreps,linkedin公司首席工程师;文章来自于他在linkedin上的分享;原文标题:The Log: What every software engineer should know about real-time data's unifying abstraction。











如果换个角度,你可以看到把整个组织系统和数据流看做是单一的分布式数据系统。你可以把所有的子查询系统(诸如Redis, SOLR,Hive表等)看成是数据的特定索引。你可以把Storm或Samza一样的流处理系统看成是发展良好的触发器和视图具体化机制。我已经注意到,传统的数据库管理人员非常喜欢这样的视图,因为它最终解释了这些不同的数据系统到底是做什么用的–它们只是不同的索引类型而已。








  • Zookeeper用于处理多个系统之间的协调,或许会从诸如Helix 或者Curator等高级别的抽象中得到一些帮助。
  • Mesos和YARN用于处理流程可视化和资源管理。
  • Lucene和LevelDB等嵌入式类库做为索引。
  • Netty,Jetty和Finagle,rest.li等封装成高级别的用于处理远程通信。
  • Avro,Protocol Buffers,Thrift和umpteen zillion等其它类库用于处理序列化。
  • Kafka和Bookeeper提供支持日志。




  • 通过对节点的并发更新的排序处理数据的一致性(无论在及时还是最终情况下)
  • 提供节点之间的数据复制
  • 提供”commit“语法(只有当写入器确保数据不会丢失时才会写入)
  • 位系统提供外部的数据订阅资源
  • 提供存储失败的复制操作和引导新的复制操作的能力
  • 处理节点间的数据平衡






服务节点或许根本无需知道”控制“或”投标选择(leader election)“的概念,对很多简单的操作,服务节点可以爱完全脱离领导的情况下提供服务,日志即是信息的来源。







在很多人看来,在日志中另外保存一份数据的完整复本是一种浪费。事实上,虽然有很多因素使得这件事并不困难。首先,日志可以是一种有效的存储机制。我们在Kafka生产环境的服务器上存储了5 TB的数据。同时有许多的服务系统需要更多的内存来提供有效的数据服务,例如文本搜索,它通常是在内存中的。服务系统同样也需样硬盘的优化。例如,我们的实时数据系统或者在内存外提供服务或者使用固态硬盘。相反,日志系统只需要线性的读写,因此,它很乐于使用TB量级的硬盘。最终,如上图所示,由多个系统提供的数据,日志的成本分摊到多个索引上,这种聚合使得外部日志的成本降到了最低点。






Part Four: System Building

The final topic I want to discuss is the role of the log in data system design for online data systems.

There is an analogy here between the role a log serves for data flow inside a distributed database and the role it serves for data integration in a larger organization. In both cases, it is responsible for data flow, consistency, and recovery. What, after all, is an organization, if not a very complicated distributed data system?


So maybe if you squint a bit, you can see the whole of your organization’s systems and data flows as a single distributed database. You can view all the individual query-oriented systems (Redis, SOLR, Hive tables, and so on) as just particular indexes on your data. You can view the stream processing systems like Storm or Samza as just a very well-developed trigger and view materialization mechanism. Classical database people, I have noticed, like this view very much because it finally explains to them what on earth people are doing with all these different data systems—they are just different index types!

There is undeniably now an explosion of types of data systems, but in reality, this complexity has always existed. Even in the heyday of the relational database, organizations had lots and lots of relational databases! So perhaps real integration hasn’t existed since the mainframe when all the data really was in one place. There are many motivations for segregating data into multiple systems: scale, geography, security, and performance isolation are the most common. But these issues can be addressed by a good system: it is possible for an organization to have a single Hadoop cluster, for example, that contains all the data and serves a large and diverse constituency.

So there is already one possible simplification in the handling of data that has become possible in the move to distributed systems: coalescing lots of little instances of each system into a few big clusters. Many systems aren’t good enough to allow this yet: they don’t have security, or can’t guarantee performance isolation, or just don’t scale well enough. But each of these problems is solvable.

My take is that the explosion of different systems is caused by the difficulty of building distributed data systems. By cutting back to a single query type or use case each system is able to bring its scope down into the set of things that are feasible to build. But running all these systems yields too much complexity.

I see three possible directions this could follow in the future.

The first possibility is a continuation of the status quo: the separation of systems remains more or less as it is for a good deal longer. This could happen either because the difficulty of distribution is too hard to overcome or because this specialization allows new levels of convenience and power for each system. As long as this remains true, the data integration problem will remain one of the most centrally important things for the successful use of data. In this case, an external log that integrates data will be very important.

The second possibility is that there could be a re-consolidation in which a single system with enough generality starts to merge back in all the different functions into a single uber-system. This uber-system could be like the relational database superficially, but it’s use in an organization would be far different as you would need only one big one instead of umpteen little ones. In this world, there is no real data integration problem except what is solved inside this system. I think the practical difficulties of building such a system make this unlikely.

There is another possible outcome, though, which I actually find appealing as an engineer. One interesting facet of the new generation of data systems is that they are virtually all open source. Open source allows another possibility: data infrastructure could be unbundled into a collection of services and application-facing system apis. You already see this happening to a certain extent in the Java stack:

Zookeeper handles much of the system co-ordination (perhaps with a bit of help from higher-level abstractions like Helix or Curator).

Mesos and YARN do process virtualization and resource management

Embedded libraries like Lucene and LevelDB do indexing

Netty, Jetty and higher-level wrappers like Finagle and rest.li handle remote communication

Avro, Protocol Buffers, Thrift, and umpteen zillion other libraries handle serialization

Kafka and Bookeeper provide a backing log.

If you stack these things in a pile and squint a bit, it starts to look a bit like a lego version of distributed data system engineering. You can piece these ingredients together to create a vast array of possible systems. This is clearly not a story relevant to end-users who presumably care primarily more about the API then how it is implemented, but it might be a path towards getting the simplicity of the single system in a more diverse and modular world that continues to evolve. If the implementation time for a distributed system goes from years to weeks because reliable, flexible building blocks emerge, then the pressure to coalesce into a single monolithic system disappears.

The place of the log in system architecture

A system that assumes an external log is present allows the individual systems to relinquish a lot of their own complexity and rely on the shared log. Here are the things I think a log can do:

Handle data consistency (whether eventual or immediate) by sequencing concurrent updates to nodes

Provide data replication between nodes

Provide “commit” semantics to the writer (i.e. acknowledging only when your write guaranteed not to be lost)

Provide the external data subscription feed from the system

Provide the capability to restore failed replicas that lost their data or bootstrap new replicas

Handle rebalancing of data between nodes.

This is actually a substantial portion of what a distributed data system does. In fact, the majority of what is left over is related to the final client-facing query API and indexing strategy. This is exactly the part that should vary from system to system: for example, a full-text search query may need to query all partitions whereas a query by primary key may only need to query a single node responsible for that key’s data.

Here is how this works. The system is divided into two logical pieces: the log and the serving layer. The log captures the state changes in sequential order. The serving nodes store whatever index is required to serve queries (for example a key-value store might have something like a btree or sstable, a search system would have an inverted index). Writes may either go directly to the log, though they may be proxied by the serving layer. Writing to the log yields a logical timestamp (say the index in the log). If the system is partitioned, and I assume it is, then the log and the serving nodes will have the same number of partitions, though they may have very different numbers of machines.

The serving nodes subscribe to the log and apply writes as quickly as possible to its local index in the order the log has stored them.

The client can get read-your-write semantics from any node by providing the timestamp of a write as part of its query—a serving node receiving such a query will compare the desired timestamp to its own index point and if necessary delay the request until it has indexed up to at least that time to avoid serving stale data.

The serving nodes may or may not need to have any notion of “mastership” or “leader election”. For many simple use cases, the serving nodes can be completely without leaders, since the log is the source of truth.

One of the trickier things a distributed system must do is handle restoring failed nodes or moving partitions from node to node. A typical approach would have the log retain only a fixed window of data and combine this with a snapshot of the data stored in the partition. It is equally possible for the log to retain a complete copy of data and garbage collect the log itself. This moves a significant amount of complexity out of the serving layer, which is system-specific, and into the log, which can be general purpose.

By having this log system, you get a fully developed subscription API for the contents of the data store which feeds ETL into other systems. In fact, many systems can share the same the log while providing different indexes, like this:

Note how such a log-centric system is itself immediately a provider of data streams for processing and loading in other systems. Likewise, a stream processor can consume multiple input streams and then serve them via another system that indexes that output.

I find this view of systems as factored into a log and query api to very revealing, as it lets you separate the query characteristics from the availability and consistency aspects of the system. I actually think this is even a useful way to mentally factor a system that isn’t built this way to better understand it.

It’s worth noting that although Kafka and Bookeeper are consistent logs, this is not a requirement. You could just as easily factor a Dynamo-like database into an eventually consistent AP log and a key-value serving layer. Such a log is a bit tricky to work with, as it will redeliver old messages and depends on the subscriber to handle this (much like Dynamo itself).

The idea of having a separate copy of data in the log (especially if it is a complete copy) strikes many people as wasteful. In reality, though there are a few factors that make this less of an issue. First, the log can be a particularly efficient storage mechanism. We store over 75TB per datacenter on our production Kafka servers. Meanwhile many serving systems require much more memory to serve data efficiently (text search, for example, is often all in memory). The serving system may also use optimized hardware. For example, most our live data systems either serve out of memory or else use SSDs. In contrast, the log system does only linear reads and writes, so it is quite happy using large multi-TB hard drives. Finally, as in the picture above, in the case where the data is served by multiple systems, the cost of the log is amortized over multiple indexes. This combination makes the expense of an external log pretty minimal.

This is exactly the pattern that LinkedIn has used to build out many of its own real-time query systems. These systems feed off a database (using Databus as a log abstraction or off a dedicated log from Kafka) and provide a particular partitioning, indexing, and query capability on top of that data stream. This is the way we have implemented our search, social graph, and OLAP query systems. In fact, it is quite common to have a single data feed (whether a live feed or a derived feed coming from Hadoop) replicated into multiple serving systems for live serving. This has proven to be an enormous simplifying assumption. None of these systems need to have an externally accessible write api at all, Kafka and databases are used as the system of record and changes flow to the appropriate query systems through that log. Writes are handled locally by the nodes hosting a particular partition. These nodes blindly transcribe the feed provided by the log to their own store. A failed node can be restored by replaying the upstream log.

The degree to which these systems rely on the log varies. A fully reliant system could make use of the log for data partitioning, node restore, rebalancing, and all aspects of consistency and data propagation. In this setup, the actual serving tier is actually nothing less than a sort of “cache” structured to enable a particular type of processing with writes going directly to the log.

原文作者:Jay Kreps  译者:LitStone, super0555, 几点人, cmy00cmy, tnjin, 928171481, 黄劼等。来自:开源中国



