工程师手记:Cassandra在风控数据处理中的应用实践

2018-08-15|风控实验室 716

Cassandra是一套开源分布式NoSQL数据库系统。由Facebook开发,主要用于储存收件箱等简单格式数据,集GoogleBigTable的数据模型与Amazon Dynamo的完全分布式的架构于一身。

undefined

2008年,Facebook将 Cassandra 开源,并被Digg、Twitter等知名公司引入,成为了一种流行的分布式结构化数据存储方案。

Cassandra是一个混合型的非关系的数据库,类似于Google的BigTable。其主要功能比Dynamo (分布式的Key-Value存储系统)更丰富,但支持度却不如文档存储MongoDB(介于关系数据库和非关系数据库之间的开源产品,是非关系数据库当中功能最丰富,最像关系数据库的。支持的数据结构非常松散,是类似json的bjson格式,因此可以存储比较复杂的数据类型)。

undefined

Cassandra的应用场景

Cassandra具有常见NoSQL 分布式数据库,其自带的命令行工具完备,兼容性强,甚至Windows机器都可以安装,因此维护、升级都相对比较简单。它有着很人性化的Web管理界面,兼容大部分SQL语法。可以设置多个“中心”,这一点跟Hadoop based的HBase / Hive 很不一样。

Cassandra读写性能和可扩展非常好。由于它是一堆数据库节点共同构成的一个分布式网络服务,只要对Cassandra 的一个节点就行写操作,会被复制到其他节点上去。同理,对Cassandra的一个节点进行读操作,也会被路由到某个节点上面去读取。因此,对于一个Cassandra群集来说,扩展性能是比较简单的事情,只管在群集里面添加节点就可以了。非常适合金融、电商等记录系统日志、产品的目录管理、实时数据存储等等要求高的行业。

我们把Cassandra应用在了数据对象服务的数据提供上,由于有多个“中心”,又减少了对于Hadoop环境的依赖,在维护管理上会非常便捷,又能大幅提升数据的存储以及查询性能。

Cassandra docker安装初体验

启动Cassandra docker实例:

$ docker run --name some-cassandra -d cassandra:tag

some-cassandra指定的容器名称,tag是指定所需Cassandra版本的标记。具体参考docker hub。

建立集群以及在已有集群上扩展机器:

假设第一台机器的IP地址是10.42.42.42第二台机器人的IP地址10.43.43.43,请使用公开的八卦端口启动第一台机器:

$ docker run --name some-cassandra -d -e CASSANDRA_BROADCAST_ADDRESS=10.42.42.42 -p 7000:7000 cassandra:tag

然后在第二台机器上启动一个Cassandra容器,暴露的八卦端口和种子指向第一台机器:

$ docker run --name some-cassandra -d -e CASSANDRA_BROADCAST_ADDRESS=10.43.43.43 -p 7000:7000 -e CASSANDRA_SEEDS=10.42.42.42 cassandra:tag

这样就建立好了一个俩台Cassandra机器的集群了,可以进行cqlsh的体验了。

Cassandra应用实践

Cassandra没有像BigTable或Hbase那样选择中心控制节点,而选择了无中心的P2P架构,网络中的所有节点都是对等的,它们构成了一个环,节点之间通过P2P协议每秒钟交换一次数据,这样每个节点都拥有其它所有节点的信息,包括位置、状态等。

undefined

客户端可以连接集群中的任一个节点来连接整个集群,和客户端建立连接的节点叫协作者(coordinator),Cassandra相当于一个代理,负责定位该次请求要发到哪些实际拥有本次请求所需数据的节点上去获取,但如何获取并返回,主要根据客户端要求的一致性级别(Consistency Level)来确定。

比如:ONE指只要有一个节点返回数据就可以对客户端做出响应,QUONUM指需要返回几个根据用户的配置数目,ALL指等于数据复制份数的所有节点都返回结果才能向客户端做出响应,对于数据一致性要求不是特别高的可以选择ONE,这是最快的一种方式。

Cassandra的数据分发和复制通常是一起的,数据用表的形式来组织,用主键来识别应该存储到哪些节点上,行的copy称作replica。当一个集群被创建时,至少要指定如下几个配置:Virtual Nodes,Partitioner,Replication Strategy,Snitch。

数据复制策略有两种,一种是SimpleStrategy,适合一个数据中心的情况,第一份数据放在Partitioner确定的节点,后面的放在顺时针找到的节点上,它不考虑跨数据中心和机架的复制。另外一种是NetworkTopologyStargegy,第一份数据和前一种一样,第二份复制的数据放在不同的机架上,每个数据中心可以有不同数据的replicas。

Partitioner策略有三种,默认是Murmur3Partitioner,使用MurmurHash。RandomPartitioner,使用Md5 Hash。ByteOrderedPartitioner使用数据的字节进行有顺分区。Cassandra默认使用MurmurHash,这种有更高的性能。

那么如果Cassandra集群动态扩展怎么办?数据怎么流动呢?如果是依次按顺序流动那么效率非常低下。这里就要提到Cassandra 的一个Vnode (virtual nodes)概念。就是把一个节点分成默认是256个Vnode来拥有较多不连续的hash值范围,以达到数据的负载的目的。

undefined

Cassandra 的写请求

先为了能够持久化与宕机恢复会写入CommitLog

– 对应的配置: commitlog_directory 

同时写入Memtable 

– Memtable : 内存之中的数据结构 

每当Memtable的数据达到一定条件时将数据Flush到SSTable 

– 条件在配置文件之中定义 

• memtable_heap_space_in_mb

• memtable_offheap_space_in_mb

– SSTable

• 真正存储到了硬盘之上:data_file_directories

• SSTable是不可变的,每次会将SSTable完全删除再新写一个 

 Flush之后,CommitLog会被自动删除。

undefined

如果客户端配置了Consistency Level是ONE,意味着只要有一个节点写入成功,就由代理节点(Coordinator)返回给客户端写入完成。当然这中间有可能会出现其它节点写入失败的情况,Cassandra自己会通过Hinted Handoff或Read Repair 或者Anti-entropy Node Repair方式保证数据最终一致性。

Cassandra的读请求

上面提到,Cassandra在读取数据有优势。在读取时,Cassandra首先检查Bloom filter,每一个SSTable都有一个Bloom filter用来检查partition key是否在这个SSTable,这一步是在访问任何磁盘IO的前面就会做掉。如果存在,再检查partition key cache,然后再做如下操作:

如果在cache中能找到索引,到compression offset map中找拥有这个数据的数据块,从磁盘上取得压缩数据并返回结果集。如果在cache中找不到索引,搜索partition summary确定索引在磁盘上的大概位置,然后获取索引入口,在SSTable上执行一次单独的寻道和一个顺序的列读取操作,下面也是到compression offset map中找拥有这个数据的数据块,从磁盘上取得压缩数据并返回结果集。读取数据时会合并Memtable中缓存的数据、多个SSTable中的数据,才返回最终的结果。

undefined

读请求(Read Request)分两种,一种是Rirect Read Request,根据客户端配置的Consistency Level读取到数据即可返回客户端结果。一种是Background Read Repair Request,除了直接请求到达的节点外,会被发送到其它复制节点,用于修复之前写入有问题的节点,保证数据最终一致性。

客户端读取时,Coordinator首先联系Consistency Level定义的节点,发送请求到最快响应的复制节点上,返回请求的数据。如果有多个节点被联系,会在内存比较每个复制节点传过的数据行,如果不一致选取最近的数据(根据时间戳)返回给客户端,并在后台更新过期的复制节点,这个过程被称作Read Repair。

Cassandra的数据整理

更新操作不会立即更新,这样会导致随机读写磁盘,效率不高,Cassandra会把数据顺序写入到一个新的SSTable,并打上一个时间戳以标明数据的新旧。它也不会立马做删除操作,而是用Tombstone来标记要删除的数据。Compaction时,将多个SSTable文件中的数据整合到新的SSTable文件中,当旧SSTable上的读请求一完成,会被立即删除,空余出来的空间可以重新利用。

虽然Compcation没有随机的IO访问,但还是一个重量级的操作,一般在后台运行,并通过限制它的吞吐量来控制,`compaction throughput mb per sec参数可以设置,默认是16M/s。另外,如果key cache显示整理后的数据是热点数据,操作系统会把它放入到page cache里以提升性能。简单来说,它的合并的策略有两种。

undefined

1、SizeTieredCompactionStrategy :每次更新不会直接更新原来的数据,这样会造成随机访问磁盘,性能不高,而是在插入或更新直接写入下一个sstable,这样是顺序写入速度非常快,适合写敏感的操作。但是,因为数据分布在多个sstable,读取时需要多次磁盘寻道,读取的性能不高。为了避免这样情况,会定期在后台将相似大小的sstable进行合并,这个合并速度也会很快,默认情况是4个sstable会合并一次,合并时如果没有过期的数据要清理掉,会需要一倍的空间,因此最坏情况需要50%的空闲磁盘。

2、LeveledCompactionStrategy:创建固定大小默认是5M的sstable,最上面一级为L0下面为L1,下面一层是上面一层的10倍大小。这种整理策略读取非常快,适合读敏感的情况,最坏只需要10%的空闲磁盘空间,它参考了LevelDB的实现。

后记

从Demo搭建到扩展到生产环境上,充分体验到了Cassandra的易用和可扩展性。它极大降低了环境的配置和解决问题之间的运维时间,能把更多的时间转到实际开发中。

400-8786-123