本文作者:Jay Kreps,linkedin公司首席工程师;文章来自于他在linkedin上的分享;原文标题:The Log: What every software engineer should know about real-time data’s unifying abstraction。
文章内容非常干货,非常值得学习。文章将以四部分进行阐述,建议大家耐心看完。
第一部分:Log是什么?
第二部分:数据集成
第三部分:日志和实时流处理
第四部分:系统建设
第四部分:系统建设
我们最后要讨论的是在线数据系统设计中日志的角色。
在分布式数据库数据流中日志的角色和在大型组织机构数据完整中日志的角色是相似的。在这两个应用场景中,日志是对于数据源是可靠的,一致的和可恢复的。组织如果不是一个复杂的分布式数据系统呢,它究竟是什么?
分类计价吗?
如果换个角度,你可以看到把整个组织系统和数据流看做是单一的分布式数据系统。你可以把所有的子查询系统(诸如Redis, SOLR,Hive表等)看成是数据的特定索引。你可以把Storm或Samza一样的流处理系统看成是发展良好的触发器和视图具体化机制。我已经注意到,传统的数据库管理人员非常喜欢这样的视图,因为它最终解释了这些不同的数据系统到底是做什么用的–它们只是不同的索引类型而已。
不可否认这类数据库系统现在大量的出现,但是事实上,这种复杂性一直都存在。即使是在关系数据库系统的鼎盛时期,组织中有大量的关系数据库系统。或许自大型机时代开始,所有的数据都存储在相同的位置,真正的集成是根本不存在的。存在多种外在需求,需要把数据分解成多个系统,这些外在需求包括:规模、地理因素、安全性,性能隔离是最常见的因素。这些需求都可以由一个优质的系统实现:例如,组织可以使用单一的Hadoop聚簇,它包括了全部的数据,可以服务于大型的和多样性的客户。
因此在向分布式系统变迁的过程中,已经存在一种处理数据的简便的方法:把大量的不同系统的小的实例聚合成为大的聚簇。许多的系统还不足以支持这一方法:因为它们不够安全,或者性能隔离性得不到保证,或者规模不符合要求。不过这些问题都是可以解决的。
依我之见,不同系统大量出现的原因是建设分布式数据库系统很困难。通过削减到单一的查询或者用例,每个系统都可以把规模控制到易于实现的程度。但是运行这些系统产生的复杂度依然很高。
未来这类问题可能的发展趋势有三种:
第一种可能是保持现状:孤立的系统还会或长或短的持续一段时间。这是因为建设分布式系统的困难很难克服,或者因为孤立系统的独特性和便捷性很难达到。基于这些原因,数据集成的核心问题仍然是如何恰当的使用数据。因此,集成数据的外部日志非常的重要。
第二种可能是重构:具备通用性的单一的系统逐步融合多个功能形成超极系统。这个超级系统表面看起来类似关系数据库系统,但是在组织中你使用时最大的不同是你只需要一个大的系统而不是无数个小系统。在这个世界里,除了在系统内已解决的这个问题不存在什么真正的数据集成问题。我想这是因为建设这样的系统的实际困难。
虽然另一种可能的结果对于工程师来说是很有吸引力的。新一代数据库系统的特征之一是它们是完全开源的。开源提供了一种可能性:数据基础架构不必打包成服务集或者面向应用的系统接口。在Java栈中,你可以看到在一定程度上,这种状况已经发生了。
- Zookeeper用于处理多个系统之间的协调,或许会从诸如Helix 或者Curator等高级别的抽象中得到一些帮助。
- Mesos和YARN用于处理流程可视化和资源管理。
- Lucene和LevelDB等嵌入式类库做为索引。
- Netty,Jetty和Finagle,rest.li等封装成高级别的用于处理远程通信。
- Avro,Protocol Buffers,Thrift和umpteen zillion等其它类库用于处理序列化。
- Kafka和Bookeeper提供支持日志。
如果你把这些堆放在一起,换个角度看,它有点像是简化版的分布式数据库系统工程。你可以把这些拼装在一起,创建大量的可能的系统。显而易见,现在探讨的不是最终用户所关心的API或者如何实现,而是在不断多样化和模块化的过程中如何设计实现单一系统的途径。因为随着可靠的、灵活的模块的出现,实施分布式系统的时间周期由年缩减为周,聚合形成大型整体系统的压力逐步消失。
日志文件在系统结构中的地位
那些提供外部日志的系统如今已允许个人电脑抛弃他们自身复杂的日志系统转而使用共享日志。在我看来,日志可以做到以下事情:
- 通过对节点的并发更新的排序处理数据的一致性(无论在及时还是最终情况下)
- 提供节点之间的数据复制
- 提供”commit“语法(只有当写入器确保数据不会丢失时才会写入)
- 位系统提供外部的数据订阅资源
- 提供存储失败的复制操作和引导新的复制操作的能力
- 处理节点间的数据平衡
这实际上是一个数据分发系统最重要的部分,剩下的大部分内容与终端调用的API和索引策略相关。这正是不同系统间的差异所在,例如:一个全文本查询语句需要查询所有的分区,而一个主键查询只需要查询负责键数据的单个节点就可以了。
下面我们来看下该系统是如何工作的。系统被分为两个逻辑区域:日志和服务层。日志按顺序捕获状态变化,服务节点存储索引提供查询服务需要的所有信息(键-值的存储可能以B-tree或SSTable的方式进行,而搜索系统可能存在与之相反的索引)。写入器可以直接访问日志,尽管需要通过服务层代理。在写入日志的时候会产生逻辑时间戳(即log中的索引),如果系统是分段式的,那么就会产生与段数目相同数量的日志文件和服务节点,这里的数量和机器数量可能会有较大差距。
服务节点订阅日志信息并将写入器按照日志存储的顺序尽快应用到它的本地索引上。
客户端只要在查询语句中提供对应的写入器的时间戳,它就可以从任何节点中获取”读写“语义。服务节点收到该查询语句后会将其中的时间戳与自身的索引比较,如果必要,服务节点会延迟请求直到对应时间的索引建立完毕,以免提供旧数据。
服务节点或许根本无需知道”控制“或”投标选择(leader election)“的概念,对很多简单的操作,服务节点可以爱完全脱离领导的情况下提供服务,日志即是信息的来源。
分发系统所需要做的其中一个比较复杂的工作,就是修复失败节点并移除几点之间的隔离。保留修复的数据并结合上各区域内的数据快照是一种较为典型的做法,它与保留完整的数据备份并从垃圾箱内回收日志的做法几乎等价。这就使得服务层简单了很多,日志系统也更有针对性。
有了这个日志系统,你可以订阅到API,这个API提供了把ETL提供给其它系统的数据内容。事实上,许多系统都可以共享相同的日志同时提供不同的索引,如下所示:
这样一个以日志为中心的系统是如何做到既数据流的提供者又同时加载其它系统的数据的呢?因为流处理器既可以消费多个输入的数据流,随后又可以通过其它系统对数据做索引为它们提供服务。
这个系统的视图可以清晰的分解到日志和查询API,因为它允许你从系统的可用性和一致性角度分解查询的特征。这可以帮助我们对系统进行分解,并理解那些并没按这种方式设计实施的系统。
虽然Kafka和Bookeeper都是一致性日志,但这不是必须的,也没什么意义。你可以轻松的把Dynamo之类的数据构分解为一致性的AP日志和键值对服务层。这样的日志使用起来灵活,因为它重传了旧消息,像Dynamo一样,这样的处理取决于消息的订阅者。
在很多人看来,在日志中另外保存一份数据的完整复本是一种浪费。事实上,虽然有很多因素使得这件事并不困难。首先,日志可以是一种有效的存储机制。我们在Kafka生产环境的服务器上存储了5 TB的数据。同时有许多的服务系统需要更多的内存来提供有效的数据服务,例如文本搜索,它通常是在内存中的。服务系统同样也需样硬盘的优化。例如,我们的实时数据系统或者在内存外提供服务或者使用固态硬盘。相反,日志系统只需要线性的读写,因此,它很乐于使用TB量级的硬盘。最终,如上图所示,由多个系统提供的数据,日志的成本分摊到多个索引上,这种聚合使得外部日志的成本降到了最低点。
LinkedIn就是使用了这种方式实现它的多个实时查询系统的。这些系统提供了一个数据库(使用数据总线做为日志摘要,或者从Kafka去掉专用的日志),这些系统在顶层数据流上还提供了特殊的分片、索引和查询功能。这也是我们实施搜索、社交网络和OLAP查询系统的方式。事实上这种方式是相当普遍的:为多个用于实时服务的服务系统提供单一的数据(这些来自Hadoop的数据或是实时的或是衍生的)。这种方式已被证实是相当简洁的。这些系统根本不需要外部可写入的API,Kafka和数据库被用做系统的记录和变更流,通过日志你可以查询系统。持有特定分片的结点在本地完成写操作。这些结点盲目的把日志提供的数据转录到它们自己的存储空间中。通过回放上行流日志可以恢复转录失败的结点。
这些系统的程度则取决于日志的多样性。一个完全可靠的系统可以用日志来对数据分片、存储结点、均衡负载,以及用于数据一致性和数据复制等多方面。在这一过程中,服务层实际上只不过是一种缓存机制,这种缓存机制允许直接写入日志的流处理。
结束语
如果你对于本文中所谈到的关于日志的大部内容,如下内容是您可以参考的其它资料。对于同一事务人们会用不同的术语,这会让人有一些困惑,从数据库系统到分布式系统,从各类企业级应用软件到广阔的开源世界。无论如何,在大方向上还是有一些共同之处。
学术论文、系统、评论和博客:
- 关于状态机和主备份复现的概述。
- PacificA是实施微软基于日志的分布式存储系统的通用架构。
- Spanner-并不是每个人都支持把逻辑时间用于他们的日志,Google最新的数据库就尝试使用物理时间,并通过把时间戳直接做为区间来直接建时钟迁移的不确定性。
- Datanomic:解构数据库,它是Rich Hickey在它的首个数据库产品中的的重要陈述之一,Rich Hickey是Clojure的创建者。
- 在消息传递系统中回卷恢复协议的调查。我发现这个有助于引入容错处理和数据库以外的应用系统日志恢复。
- Reactive Manifesto-事实上我并不清楚反应编程的确切涵义,但是我想它和“事件驱动”指的是同一件事。这个链接并没有太多的讯息,但由久富盛史的Martin Odersky讲授的课程是很有吸引力的。
- Paxos!
1)Leslie Lamport有一个有趣的历史:在80年代算法是如何发现的,但是直到1998年才发表了,因为评审组不喜欢论文中的希腊寓言,而作者又不愿修改。
2)甚至于论文发布以后,它还是不被人们理解。Lamport再次尝试,这次它包含了一些并不有趣的小细节,这些细节是关于如何使用这些新式的自动化的计算机的。它仍然没有得到广泛的认可。
3)Fred Schneider和Butler Lampson分别给出了更多细节关于在实时系统中如何应用Paxos.
4)一些Google的工程师总结了他们在Chubby中实施Paxos的经验。
5)我发现所有关于Paxos的论文理解起来很痛苦,但是值得我们费大力气弄懂。你不必忍受这样的痛苦了,因为日志结构的文件系统的大师John Ousterhout的这个视频让这一切变得相当的容易。这些一致性算法用展开的通信图表述的更好,而不是在论文中通过静态的描述来说明。颇为讽刺的是,这个视频录制的初衷是告诉人们Paxos很难理解。
6)使用Paxos来构造规模一致的数据存储。
Paxos有很多的竞争者。如下诸项可以更进一步的映射到日志的实施,更适合于实用性的实施。
1)由Barbara Liskov提出的视图戳复现是直接进行日志复现建模的较早的算法。
2)Zab是Zookeeper所使用的算法。
3)RAFT是易于理解的一致性算法之一。由John Ousterhout讲授的这个视频非常的棒。
你可以的看到在不同的实时分布式数据库中动作日志角色:
- 1) PNUTS是探索在大规模的传统的分布式数据库系统中实施以日志为中心设计理念的系统。
- 2)Hbase和Bigtable都是在目前的数据库系统中使用日志的样例。
- 3)LinkedIn自己的分布式数据库Espresso和PNUTs一样,使用日志来复现,但有一个小的差异是它使用自己底层的表做为日志的来源。
流处理:这个话题要总结的内容过于宽泛,但还是有几件我所关注的要提一下:
- 1)TelegraphCQ
- 2) Aurora
- 3) NiagaraCQ
- 4) 离散流:这篇论文讨论了Spark的流式系统。
- 5)MillWheel 它是Google的流处理系统之一。
- 6)Naiad:一个实时数据流系统
- 7)在数据流系统中建模和相关事件:它可能是研究这一领域的最佳概述之一。
- 8)分布处式流处理的高可用性算法。
企业级软件存在着同样的问题,只是名称不同,或者规模较小,或者是XML格式的。哈哈,开个玩笑。
事件驱动–据我所知:它就是企业级应用的工程师们常说的“状态机的复现”。有趣的是同样的理念会用在如此迥异的场景中。事件驱动关注的是小的、内存中的使用场景。这种机制在应用开发中看起来是把发生在日志事件中的“流处理”和应用关联起来。因此变得不那么琐碎:当处理的规模大到需要数据分片时,我关注的是流处理作为独立的首要的基础设施。
变更数据捕获–在数据库之外会有些对于数据的舍入处理,这些处理绝大多数都是日志友好的数据扩展。
企业级应用集成,当你有一些现成的类似客户类系管理CRM和供应链管理SCM的软件时,它似乎可以解决数据集成的问题。
复杂事件处理(CEP)没有人知道它的确切涵义或者它与流处理有什么不同。这些差异看起来集中在无序流和事件过滤、发现或者聚合上,但是依我之见,差别并不明显。我认为每个系统都有自己的优势。
企业服务总线(ESB)–我认为企业服务总线的概念类似于我所描述的数据集成。在企业级软件社区中这个理念取得了一定程度的成功,对于从事网络和分布式基础架构的工程师们这个概念还是很陌生的。
一些相关的开源软件:
- Kafka是把日志作为服务的一个项目,它是后边所列各项的基础。
- Bookeeper 和Hedwig 另外的两个开源的“把日志作为服务”的项目。它们更关注的是数据库系统内部构件而不是事件数据。
- Databus是提供类似日志的数据库表的覆盖层的系统。
- Akka 是用于Scala的动作者架构。它有一个事件驱动的插件,它提供持久化和记录。
- Samza是我们在LinkedIn中用到的流处理框架,它用到了本文论述的诸多理念,同时与Kafka集成来作为底层的日志。
- Storm是广泛使用的可以很好的与Kafka集成的流处理框架之一。
- Spark Streaming一个流处理框架,它是Spark的一部分。
- Summingbird是在Storm或Hadoop之上的一层,它提供了便洁的计算摘要。
对于这一领域,我将持续的关注,如何您知道一些我遗漏的内容,请您告知,谢谢。
Part Four: System Building
The final topic I want to discuss is the role of the log in data system design for online data systems.
There is an analogy here between the role a log serves for data flow inside a distributed database and the role it serves for data integration in a larger organization. In both cases, it is responsible for data flow, consistency, and recovery. What, after all, is an organization, if not a very complicated distributed data system?
Unbundling?
So maybe if you squint a bit, you can see the whole of your organization’s systems and data flows as a single distributed database. You can view all the individual query-oriented systems (Redis, SOLR, Hive tables, and so on) as just particular indexes on your data. You can view the stream processing systems like Storm or Samza as just a very well-developed trigger and view materialization mechanism. Classical database people, I have noticed, like this view very much because it finally explains to them what on earth people are doing with all these different data systems—they are just different index types!
There is undeniably now an explosion of types of data systems, but in reality, this complexity has always existed. Even in the heyday of the relational database, organizations had lots and lots of relational databases! So perhaps real integration hasn’t existed since the mainframe when all the data really was in one place. There are many motivations for segregating data into multiple systems: scale, geography, security, and performance isolation are the most common. But these issues can be addressed by a good system: it is possible for an organization to have a single Hadoop cluster, for example, that contains all the data and serves a large and diverse constituency.
So there is already one possible simplification in the handling of data that has become possible in the move to distributed systems: coalescing lots of little instances of each system into a few big clusters. Many systems aren’t good enough to allow this yet: they don’t have security, or can’t guarantee performance isolation, or just don’t scale well enough. But each of these problems is solvable.
My take is that the explosion of different systems is caused by the difficulty of building distributed data systems. By cutting back to a single query type or use case each system is able to bring its scope down into the set of things that are feasible to build. But running all these systems yields too much complexity.
I see three possible directions this could follow in the future.
The first possibility is a continuation of the status quo: the separation of systems remains more or less as it is for a good deal longer. This could happen either because the difficulty of distribution is too hard to overcome or because this specialization allows new levels of convenience and power for each system. As long as this remains true, the data integration problem will remain one of the most centrally important things for the successful use of data. In this case, an external log that integrates data will be very important.
The second possibility is that there could be a re-consolidation in which a single system with enough generality starts to merge back in all the different functions into a single uber-system. This uber-system could be like the relational database superficially, but it’s use in an organization would be far different as you would need only one big one instead of umpteen little ones. In this world, there is no real data integration problem except what is solved inside this system. I think the practical difficulties of building such a system make this unlikely.
There is another possible outcome, though, which I actually find appealing as an engineer. One interesting facet of the new generation of data systems is that they are virtually all open source. Open source allows another possibility: data infrastructure could be unbundled into a collection of services and application-facing system apis. You already see this happening to a certain extent in the Java stack:
Zookeeper handles much of the system co-ordination (perhaps with a bit of help from higher-level abstractions like Helix or Curator).
Mesos and YARN do process virtualization and resource management
Embedded libraries like Lucene and LevelDB do indexing
Netty, Jetty and higher-level wrappers like Finagle and rest.li handle remote communication
Avro, Protocol Buffers, Thrift, and umpteen zillion other libraries handle serialization
Kafka and Bookeeper provide a backing log.
If you stack these things in a pile and squint a bit, it starts to look a bit like a lego version of distributed data system engineering. You can piece these ingredients together to create a vast array of possible systems. This is clearly not a story relevant to end-users who presumably care primarily more about the API then how it is implemented, but it might be a path towards getting the simplicity of the single system in a more diverse and modular world that continues to evolve. If the implementation time for a distributed system goes from years to weeks because reliable, flexible building blocks emerge, then the pressure to coalesce into a single monolithic system disappears.
The place of the log in system architecture
A system that assumes an external log is present allows the individual systems to relinquish a lot of their own complexity and rely on the shared log. Here are the things I think a log can do:
Handle data consistency (whether eventual or immediate) by sequencing concurrent updates to nodes
Provide data replication between nodes
Provide “commit” semantics to the writer (i.e. acknowledging only when your write guaranteed not to be lost)
Provide the external data subscription feed from the system
Provide the capability to restore failed replicas that lost their data or bootstrap new replicas
Handle rebalancing of data between nodes.
This is actually a substantial portion of what a distributed data system does. In fact, the majority of what is left over is related to the final client-facing query API and indexing strategy. This is exactly the part that should vary from system to system: for example, a full-text search query may need to query all partitions whereas a query by primary key may only need to query a single node responsible for that key’s data.
Here is how this works. The system is divided into two logical pieces: the log and the serving layer. The log captures the state changes in sequential order. The serving nodes store whatever index is required to serve queries (for example a key-value store might have something like a btree or sstable, a search system would have an inverted index). Writes may either go directly to the log, though they may be proxied by the serving layer. Writing to the log yields a logical timestamp (say the index in the log). If the system is partitioned, and I assume it is, then the log and the serving nodes will have the same number of partitions, though they may have very different numbers of machines.
The serving nodes subscribe to the log and apply writes as quickly as possible to its local index in the order the log has stored them.
The client can get read-your-write semantics from any node by providing the timestamp of a write as part of its query—a serving node receiving such a query will compare the desired timestamp to its own index point and if necessary delay the request until it has indexed up to at least that time to avoid serving stale data.
The serving nodes may or may not need to have any notion of “mastership” or “leader election”. For many simple use cases, the serving nodes can be completely without leaders, since the log is the source of truth.
One of the trickier things a distributed system must do is handle restoring failed nodes or moving partitions from node to node. A typical approach would have the log retain only a fixed window of data and combine this with a snapshot of the data stored in the partition. It is equally possible for the log to retain a complete copy of data and garbage collect the log itself. This moves a significant amount of complexity out of the serving layer, which is system-specific, and into the log, which can be general purpose.
By having this log system, you get a fully developed subscription API for the contents of the data store which feeds ETL into other systems. In fact, many systems can share the same the log while providing different indexes, like this:
Note how such a log-centric system is itself immediately a provider of data streams for processing and loading in other systems. Likewise, a stream processor can consume multiple input streams and then serve them via another system that indexes that output.
I find this view of systems as factored into a log and query api to very revealing, as it lets you separate the query characteristics from the availability and consistency aspects of the system. I actually think this is even a useful way to mentally factor a system that isn’t built this way to better understand it.
It’s worth noting that although Kafka and Bookeeper are consistent logs, this is not a requirement. You could just as easily factor a Dynamo-like database into an eventually consistent AP log and a key-value serving layer. Such a log is a bit tricky to work with, as it will redeliver old messages and depends on the subscriber to handle this (much like Dynamo itself).
The idea of having a separate copy of data in the log (especially if it is a complete copy) strikes many people as wasteful. In reality, though there are a few factors that make this less of an issue. First, the log can be a particularly efficient storage mechanism. We store over 75TB per datacenter on our production Kafka servers. Meanwhile many serving systems require much more memory to serve data efficiently (text search, for example, is often all in memory). The serving system may also use optimized hardware. For example, most our live data systems either serve out of memory or else use SSDs. In contrast, the log system does only linear reads and writes, so it is quite happy using large multi-TB hard drives. Finally, as in the picture above, in the case where the data is served by multiple systems, the cost of the log is amortized over multiple indexes. This combination makes the expense of an external log pretty minimal.
This is exactly the pattern that LinkedIn has used to build out many of its own real-time query systems. These systems feed off a database (using Databus as a log abstraction or off a dedicated log from Kafka) and provide a particular partitioning, indexing, and query capability on top of that data stream. This is the way we have implemented our search, social graph, and OLAP query systems. In fact, it is quite common to have a single data feed (whether a live feed or a derived feed coming from Hadoop) replicated into multiple serving systems for live serving. This has proven to be an enormous simplifying assumption. None of these systems need to have an externally accessible write api at all, Kafka and databases are used as the system of record and changes flow to the appropriate query systems through that log. Writes are handled locally by the nodes hosting a particular partition. These nodes blindly transcribe the feed provided by the log to their own store. A failed node can be restored by replaying the upstream log.
The degree to which these systems rely on the log varies. A fully reliant system could make use of the log for data partitioning, node restore, rebalancing, and all aspects of consistency and data propagation. In this setup, the actual serving tier is actually nothing less than a sort of “cache” structured to enable a particular type of processing with writes going directly to the log.
原文作者:Jay Kreps 译者:LitStone, super0555, 几点人, cmy00cmy, tnjin, 928171481, 黄劼等。来自:开源中国
End.
转载请注明来自36大数据(36dsj.com):36大数据 » 首席工程师揭秘:LinkedIn大数据后台是如何运作的?(四)
爱盈利-运营小咖秀 始终坚持研究分享移动互联网App数据运营推广经验、策略、全案、渠道等纯干货知识内容;是广大App运营从业者的知识启蒙、成长指导、进阶学习的集聚平台;