重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
键值查询是很常见的查询场景,在数据表上建有索引后,即使表中数据记录数巨大(几亿甚至几十亿行),用键值查询出单条记录也会很快,因为建立索引后的复杂度只有 logN(以 2 为底)次, 10 亿行数据也只要比较 30 次(10 亿约等于 2^30),在现代计算机上也只需要数十毫秒而已。
创新互联是一家业务范围包括IDC托管业务,网站空间、主机租用、主机托管,四川、重庆、广东电信服务器租用,达州托管服务器,成都网通服务器托管,成都服务器租用,业务范围遍及中国大陆、港澳台以及欧美等多个国家及地区的互联网数据服务公司。
不过,如果需要查询的键值很多,比如多达几千甚至几万的时候,如果每次都独立查找,那读取和比较也会累积到几万甚至几十万次,时间延迟由此也会涨到几十分钟甚至小时级别,这时候简单地使用数据库索引对于用户体验必然是难以容忍的了。
下面我们要介绍的集算器组表功能,基于高性能索引和批量键值查找,可以有效地应对这种场景。我们会按照以下几种顺序逐步深入讨论:
1)单字段键
2)多字段键
3)多线程查询
4)数据追加的处理
需要说明的,本文只研讨单机的情况,后续还有文章会继续深入讨论基于集群的方案。
我们以下表这种比较典型的数据结构为例:
字段名称 | 类型 | 是否主键 | 说明 |
id | long | 是 | 从 1 开始自增 |
data | string | 需要获取的数据 |
首先我们创建一个组表,把源数据导入组表:
A | |
1 | =file("single.ctx") |
2 | =A1.create(#id,data) |
3 | =file("single_source.txt") |
4 | =A3.cursor@t() |
5 | =A2.append(A4) |
A1:建立并打开指定文件对象,这里的 single.ctx 是将要创建的组表文件,扩展名用 ctx。关于组表的详细定义和使用方法可以参考集算器教程。
A2:创建组表的数据结构为(id,data)。其中,# 开头的字段称为维字段,组表将假定数据按该字段有序,也就是组表 A2 将对键字段 id 有序。组表默认使用列存方式。
A3:假定源数据以文本方式存储,A3 打开数据文件。这个 txt 文件的数据表头以及前几行部分数据如下图所示。当然,源数据也可以来自数据库等其它类型的数据源。
A4:针对源数据生成游标。其中 @t 选项指明文件中第一行记录是字段名。
A5:将游标 A4 中的记录追加写入组表。
上面的脚本假定主键 id 在原始数据中已经是有序的了,如果实际情况的主键并非有序,那就需要先将主键排序后再建为组表。这时可以使用cs.sortx()函数排序,具体方法详见函数参考。
在集算器的设计器中,通过三行代码,可以直观看到其中前十条数据,代码和截图如下所示:
A | |
1 | =file("single.ctx") |
2 | =A1.create() |
3 | =A2.cursor().fetch(10) |
A1:打开指定文件名的组表文件对象。
A2:f.create(),函数中无参数时则直接打开组表。
A3:使用游标访问 A2 中的前十条数据,如下图。
接下来,我们为组表文件建立索引,以提升检索性能:
A | |
1 | =file("single.ctx") |
2 | =A1.create() |
3 | =A2.index(id_idx;id;data) |
A1:打开指定文件名的组表文件对象。
A2:使用无参数的 create 函数打开组表。
A3:建立索引。在函数 T.index() 中,参数 id_idx 是索引名称,id 是维,data 是数据列。一般情况下,建索引并不需要使用数据列,在用索引查找时会到原数据表中再去找数据。不过,本例在建立索引时将数据列也包含进了索引,这样查找时就不再引用数据列了,虽然占用的空间会大一些,但是查找的也会更快一些。
按维字段建索引时会利用组表已经有序的特点不再排序。如果开始建组表时没有使用 #,那么这时建索引时就会重新排序。
使用主、子程序调用的方式来完成查询:
查询子程序脚本 search.dfx:
A | |
1 | =file("single.ctx") |
2 | =A1.create() |
3 | =keys |
4 | =A2.icursor(;A3.contain(id),id_idx) |
5 | >file("result.txt").export@t(A4) |
A3:keys 是参数,由下面的主程序在调用时传递。
A4:在组表的 icursor()这个函数中,使用索引 id_idx,以条件 A3.contain(id) 来过滤组表。集算器会自动识别出 A3.contain(id) 这个条件可以使用索引,并会自动将 A3 的内容排序后从前向后查找。
A5:将 A4 中查询出的结果导出至 result.txt。这里 @t 选项指定导出时将输出字段名。
主程序脚本:
A | |
1 | =file("keys.txt").import@i() |
2 | =call("search.dfx",A1) |
A1:从keys.txt获取查询键值序列,因为只有一列结果,可以使用 @i 选项,将结果返回成序列:
这个序列就是需要进行查询的随机键值集。例子中使用 keys.txt 来预先存好随机的键值,实际应用中,也可以用其他数据源来存储。
A2:调用子程序 serach.dfx,把 A1 获得的键值集作为参数传递给子程序。
下面就是结果文件 result.txt 中的部分内容:
另外,我们还可以将集算器嵌入到 Java 应用程序中,从而为 Java 应用提供灵活、简便的数据查询能力。嵌入时可以像用 JDBC 访问数据库那样访问集算器脚本。具体的写法可以参阅教程《被 JAVA 调用》一节。
本例的单字段键查询示例,在数据结构上较为简单。其中查询的键值字段为 id,需要获取的数据为单列的 data,如果还有其它列,例如:
字段名称 | 类型 | 是否主键 | 说明 |
id | Long | 是 | 从 1 开始自增 |
data1 | String | 需要获取的数据 1 | |
data2 | Int | 需要获取的数据 2 | |
…… | …… | …… | |
dataN | …… | 需要获取的数据 N |
那么在建立索引步骤时,就应该包含多个数据列字段,数据列参数的写法如下所示:
A | |
1 | =file("single.ctx") |
2 | =A1.create() |
3 | =A2.index(id_idx;id;data1,data2,…,dataN) |
在接下来要讨论的多字段键情况中,建索引时需要建立多个索引字段,对应参数部分也有类似的写法:index(id_idx;id1,id2,…,idN;data1,data2,…,dataN)。
多字段健指的是联合主键的情况,例如:
字段名称 | 类型 | 是否主键 | 说明 |
type | string | 可枚举 | |
Id | long | 每种枚举类型的 id 都从 1 开始自增 | |
data | string | 需要获取的数据 |
其中 type 和 id 两个字段作为联合主键确定一条记录。
A | |
1 | =file("multi.ctx") |
2 | =A1.create(#type,#id,data) |
3 | =file("multi_source.txt") |
4 | =A3.cursor@t() |
5 | =A2.append(A4) |
本例中 A2 需要指定两个维,type和 id,代码其它部分与单字段键一致。
A | |
1 | =file("multi.ctx") |
2 | =A1.create() |
3 | =A2.index(type_id_idx;type,id;data) |
由于本例中有两个维,建立索引时需要包含 type 和 id 两个维,如 A3 所示。
A | |
1 | =file("multi.ctx") |
2 | =A1.create() |
3 | =[["type_a",55],["type_b",66]] |
4 | =A2.icursor(;A3.contain([type,id]),type_id_idx) |
5 | >file("result.txt").export@t(A4) |
A3准备了两条数据,是由 type 和 id 构成的二元组,作为查找的建值集,结构如下图所示:
A4:A3.contain([type,id]),基于二元组的序列进行数据的筛选,所以需要将 contain 函数中的参数也变为二元组。
最终导出的结果文件内容如下:
虽然多字段键可以直接使用,但是涉及到集合的存储和比较都要慢一些。为了获取高性能,更常用的办法是把多字段键拼成单字段键。
观察本例数据结构,虽然 type 是个串,但却是可枚举的,因此可以将 type 数字化后,与 id 合并为一个新的主键字段。而 long 类型最大值为 2^63-1,完全可以容纳 id 和 type 数字化后的合并结果。我们把 type 和 id 合并后的新主键叫做 nid,可以按数据的规模,确定 nid 中分别用几位代表 type 和 id。
举例来说,id 的范围是 9 位数,type 的枚举个数用 3 位数表示就够了。因此对于 nid 而言,需要 13 位(为了避免前几位是 0,看上去不整齐,我们把第一位数字设为 1)。这样就可以把联合主键变成单字段的唯一主键,去掉第一位后的 12 位数,前 3 位代表数字化后的 type,后 9 位就是原来的 id。
代码如下:
A | |
1 | =["type_a",……,"type_z","type_1",……,"type_9","type_0"] |
2 | =A1.new(#:tid,~:type) |
3 | =file("multi_source.txt") |
4 | =A3.cursor@t() |
5 | =A4.switch(type,A2:type) |
6 | =A4.new(1000000000000+type.tid*long(1000000000)+id:nid,data) |
7 | =A4.skip(99999995) |
8 | =A4.fetch(10) |
A1:type 的枚举值组成的序列。在实际情况中,枚举列表可能来自文件或者数据库数据源。。
A2:给枚举值序列中每个 type 一个 tid。为后续的数字化主键合并做准备。
A3~A6:从 multi_source.txt 文件中获取数据,并按照 A2 中的对应关系,把 type 列的枚举串变成数字,然后将 type 和 id 进行合并后,生成新的主键 nid。
A7~A8:查看一下合并逐渐后的数据情况,跳过游标 A4 的前 99999995 条记录后,取 10 条记录,结果如下:
这样就得到了新的“单字段建”的数据结构:
字段名称 | 类型 | 是否主键 | 说明 |
nid | long | 是 | 包含 type 和 id 信息的唯一主键 |
data | string | 需要获取的数据 |
接下来按照 "单字段键" 中的做法就可以处理了,当然还要注意确保 nid 有序。
在上述方法的基础上,我们还可以采用多线程并行方式来进一步提高性能。
所谓多线程并行,就是把数据分成 N 份,用 N 个线程查询。但如果只是随意地将数据分成 N 份,很可能无法真正地提高性能。因为将要查询的键值集是未知的,所以理论上也无法确保希望查找的数据能够均匀分布在每一份组表文件中。比较好的处理方式是先观察键值集的特征,从而尽可能地进行数据的均匀拆分。
比如说,继续使用上文中多字段键拼成单字段键的例子,将合并后的主键 nid 对 4 取模,余数相同的数据存在同一个组表中,最终由 4 个组表文件装载现有全部数据。这样的文件拆分方法,可以使被查询的数据分布的相对更加均匀一些。
如果键值数据有比较明显的业务特征,我们可以考虑按照实际业务场景使用日期、部门之类的字段来处理文件拆分。如:将属于部门 A 的 1000 条记录均分在 10 个文件中,每个文件就有 100 条记录。在利用多线程查询属于部门 A 的记录时,每个线程就会从各自对应的文件中取数相应的这 100 条记录了。
下面我们来看个实际的例子。
A | |
1 | =["type_a",……,"type_z","type_1",……,"type_9","type_0"] |
2 | =A1.new(#:tid,~:type) |
3 | =file("multi_source.txt") |
4 | =A3.cursor@t() |
5 | =A4.switch(type,A2:type) |
6 | =A4.new(1000000000000+type.tid*long(1000000000)+id:nid,data) |
7 | =N.(file("nid_"+string(~-1)+"_T.ctx").create(#nid,data)) |
8 | =N.(eval("channel(A4).select(nid%N=="+string(~-1)+").attach(A7("+string(~)+").append(~.cursor()))")) |
9 | for A6,500000 |
A1~A6:与多字段键的方法二一致。
A7:使用循环函数,创建名为“键值名 _ 键值取 N 的余数 _T.ctx”的组表文件,其结构同为 (#nid,data)。
A8:用循环函数将游标数据分别追加到 N 个原组表上。比如当 N=1 时,拼出的 eval 函数参数为:channel(A4).select(nid%4==0).attach(A7(1).append(~.cursor()))。意思是对游标 A4 创建管道,将管道中记录按键值 nid 取 4 的余数,将余数值等于 0 的记录过滤出来。attach 是对当前管道的附加运算,表示取和当前余数值对应的原组表,将当前管道中筛选过滤出的记录,以游标记录的方式追加到 A7(1),即第 1 个组表。
A9:循环游标 A6,每次获取 50 万条记录,直至 A6 游标中的数据取完。
执行后,产出 4(这时例子取 N=4)个独立的组表文件:
A | B | |
1 | fork directory@p("nid*T.ctx") | =file(A1).create().index(nid_idx;nid;data) |
A1:列出满足 nid*T.ctx 的文件名(这里 * 为通配符),这里 @p 选项代表需要返回带有完整路径信息的文件名。使用 fork 执行多线程时,需要注意环境中的并行限制数是否设置合理。这里用了 4 个线程,设计器中对应的设置如下:
B2:每个线程为各个组表建立对应的索引文件,最终结果如下:
A | B | |
1 | =file("keys.txt").import@i() | |
2 | =A1.group(~%N) | |
3 | fork N.(~-1),A2 | =A3(2) |
4 | =file("nid_"/A3(1)/"_T.ctx").create().icursor(;B3.contain(nid),nid_idx) | |
5 | return B4 | |
6 | =A3.conjx() | |
7 | =file("result_nid.txt").export@t(A6) |
A1:从 keys.txt 获取查询键值序列,因为只有一列结果,使用 @i 选项,将结果返回成序列:
A2:把 A1 的序列按 4 的余数进行等值分组:
A3、B3~B5:用 fork 函数,按等值分组后的键值对各个组表分别并行查询。这里的 fork 后面分别写了两个参数,第一个是循环函数 N.(~-1),第二个是 A2。在接下来的 B3、B4 中分别使用 A3(2) 和 A3(1) 来获取 fork 后面这两个对应顺序的参数,B4:对组表文件进行根据 B3 中的键值集进行数据筛选,B5:返回游标。由于 A3 中是多个线程返回的游标序列,所以 A6 中需要使用 conjx 对多个游标进行纵向连接。
A6~A7:将多个线程返回的游标进行纵向连接后,导出游标记录至文本文件,前几行内容如下。
前面我们已经解决了针对大数据的批量随机键值查询问题,不过,我们不能假定数据永远不变。尤其是对于大数据来说,新数据的追加是必然要面对的。在将新数据追加到原有组表文件中时,我们需要讨论三种情况:有序键值追加、无序键值追加,以及数据量很大时的数据追加。
单个文件时,如果键值有序,追加的代码如下:
A | |
1 | =file("single.ctx") |
2 | =A1.create() |
3 | =file("single_add.txt") |
4 | =A3.cursor@t() |
5 | =A2.append(A4) |
A1:single.ctx 是已有的组表文件,结构为 (#id,data),其中 id 为自增键值。
A3~A5:新数据文件与已有文件结构相同,其 id 加入原组表后,对于整体数据也是有序的。这种情况可以直接追加到原组表,组表会自动更新索引。
如果按按多线程的方法拆分为多个文件,代码如下:
A | |
1 | =file("single_add.txt") |
2 | =A1.cursor@t() |
3 | =directory@p("id*T.ctx").(file(~).create()) |
4 | =N.(eval("channel(A2).select(id%N=="+string(~-1)+").attach(A3("+string(~)+").append([~.cursor()]))")) |
5 | for A2,500000 |
A1、A2:用游标方式获取新增数据。
A3:满足通配符串:"id*T.ctx",现有 N 份组表文件的序列。
A4、A5:与前面方法中的代码一致。
同样先来看一下单个文件的追加方法,以单字段键为例,代码如下:
A | |
1 | =file("single.ctx") |
2 | =A1.create().cursor() |
3 | =file("single_add.txt") |
4 | =A3.cursor@t() |
5 | =file("single.ctx_temp").create(#id,data) |
6 | =A5.append([A2,A4].mergex(id)) |
A2:游标方式打开现有组表。
A4:游标方式获取新增数据。
A5:建个新的组表文件。
A6:在新的组表中存放现有组表数据和新增数据归并后的结果。这里要注意的是,用 cs.mergex(x) 进行归并操作,需要 cs 序列对 x 有序,也就是要求组表文件 A1 和新增数据文件 A3 中的数据对于 id 都分别有序。若不满足 cs 对 x 有序,程序虽然不会报错,但是归并出来的结果也是无序的。
当这段代码执行完后,还需要进行旧组表、旧索引的清理以及对新组表的建立索引等操作:
A | |
1 | =movefile(file("single.ctx")) |
2 | =movefile(file("single.ctx__id_idx")) |
3 | =movefile(file("single.ctx_temp"),"single.ctx")) |
4 | =file("single.ctx").create().index(id_idx;id;data) |
前三行是文件操作,详见函数参考:movefile。A4 为组表建立索引,不再详述。
下面再看看多个文件的追加方法,以多字段键转单字段键后的数据结构 (nid,data) 为例,代码如下:
A | |
1 | =["type_a",……,"type_z","type_1",……,"type_9","type_0"] |
2 | =A1.new(#:tid,~:type) |
3 | =file("multi_source_add.txt") |
4 | =A3.cursor@t() |
5 | =A4.switch(type,A2:type) |
6 | =A4.new(1000000000000+type.tid*long(1000000000)+id:nid,data) |
7 | =directory@p("nid*T.ctx").(file(~).create().cursor()) |
8 | =directory@p("nid*T.ctx").(file(~+"_temp").create(#nid,data)) |
9 | =N.(eval("channel(A4).select(nid%N=="+string(~-1)+").attach(A8("+string(~)+").append([~.cursor(),A7("+string(~)+")].mergex(nid)))")) |
10 | for A4,500000 |
A3:multi_source_add.txt 是新增数据来源。
A7:假设原组表已存在,列出原组表的文件名,依次获取组表游标,返回成序列。
A8:建立新的组表文件,用来存放旧组表数据和新增数据,在原有文件名后加上 "_temp",以示区别。
A9:对新增数据使用管道,将管道中的 N 份游标记录与对应的 N 个旧份组表中游标记录进行归并,追加到新 N 份组表中。上文已有详细的解释。
当这段代码执行完后,还需要进行旧组表、旧索引的清理以及对新组表的索引建立等操作,如下:
A | |
1 | =directory@p("*T.ctx_temp") |
2 | =A1.(left(~,len(~)-5)) |
3 | =A2.(movefile(file(~))) |
4 | =A1.(left(~,len(~)-5)+"__nid_idx") |
5 | =A2.(movefile(file(~))) |
6 | =A1.(movefile(file(~),left(~,len(~)-5))) |
7 | =A2.(file(~).create().index(nid_idx;nid;data)) |
代码中几乎全是循环函数与简单的文件操作。详见函数参考《文件》。最后一行建立索引,前文中也已多次解释。
随着新数据不断增加,每次新追加数据与全量历史数据归并的时间成本将会越来越高。这时需要把每份组表文件分为新、旧两份,新的一份是最近一段时间内积累的追加数据,旧的是之前的历史数据。每当有新数据需要追加时,还是按 2.4.2 的处理思路操作,但只对新的那份组表文件进行处理。当新份数据文件超过一定大小阈值(如 100G),再和旧数据合并。这样的做法不仅可以减少归并的时间成本,另一方面也可以降低对磁盘的损耗。
列举的数据结构还是以 (nid,data) 为例,这次我们从头开始完整地看一遍代码:
首先定义新、旧组表文件,命名规则如下:
新份组表:键值名 _ 键值取 N 的余数 _T.ctx;旧份组表:键值名 _ 键值取 N 的余数 _H.ctx。
1、 建立新、旧组表,本例中 N=4,代表建立 4 份组表:
A | |
1 | =N.(file("nid_"+string(~-1)+"_H.ctx").create(#nid,data)) |
2 | =N.(file("nid_"+string(~-1)+"_T.ctx").create(#nid,data)) |
N 取 4,生成的历史和临时组表文件如下:
2、 在新组表上追加新数据。
A | |
1 | =["type_a",……,"type_z","type_1",……,"type_9","type_0"] |
2 | =A1.new(#:tid,~:type) |
3 | =file("multi_source_.txt") |
4 | =A3.cursor@t() |
5 | =A4.switch(type,A2:type) |
6 | =A4.new(1000000000000+type.tid*long(1000000000)+id:nid,data) |
7 | =directory@p("nid*T.ctx").(file(~).create().cursor()) |
8 | =directory@p("nid*T.ctx").(file(~+"_temp").create(#nid,data)) |
9 | =N.(eval("channel(A4).select(nid%N=="+string(~-1)+").attach(A8("+string(~)+").append([~.cursor(),A7("+string(~)+")].mergex(nid)))")) |
10 | for A4,500000 |
3、 新组表合并后,清理原来的新组表和索引,然后重建新组表索引。
A | |
1 | =directory@p("*T.ctx_temp") |
2 | =A1.(left(~,len(~)-5)) |
3 | =A2.(movefile(file(~))) |
4 | =A1.(left(~,len(~)-5)+"__nid_idx") |
5 | =A2.(movefile(file(~))) |
6 | =A1.(movefile(file(~),left(~,len(~)-5))) |
7 | =A2.(file(~).create().index(nid_idx;nid;data)) |
4、 对新数据大小进行判断,如果超过参数 B(单位是字节数)则与旧份组表数据合并。
A | B | C | |
1 | fork directory@p("nid*T.ctx") | =file(A1) | |
2 | if B1.size()>B | =left(A1,(len(A1)-5))+"H.ctx" | |
3 | =B1.create().cursor() | ||
4 | =file(C2).create().cursor() | ||
5 | =left(A1,(len(A1)-5))+"H.ctx_temp" | ||
6 | =file(C5).create(#nid,data).append([C3,C4].mergex(nid)) |
5、 旧组表与新组表合并后,清理原来的旧组表和索引,然后重建旧组表索引。清理已合并的新组表,并重建空的新组表。
A | |
1 | =directory@p("*H.ctx_temp") |
2 | =A1.(left(~,len(~)-5)) |
3 | =A2.(movefile(file(~))) |
4 | =A1.(left(~,len(~)-5)+"__nid_idx") |
5 | =A4.(movefile(file(~))) |
6 | =A1.(movefile(file(~),left(~,len(~)-5))) |
7 | =A2.(file(~).create().index(nid_idx;nid;data)) |
8 | =A1.(left(~,len(~)-10)+"T.ctx") |
9 | =A8.(movefile(file(~))) |
10 | =A1.(left(~,len(~)-10)+"T.ctx__nid_idx") |
11 | =A10.(movefile(file(~))) |
12 | =A8.(file(~).create(#nid,data)) |
6、 对新、旧组表文件分别利用多线程进行查询
A | B | |
1 | =file("keys.txt").import@i() | |
2 | =A1.group(~%N) | |
3 | fork directory@p("*H.ctx"),directory@p("*T.ctx"),A2 | =A3(3) |
4 | =file(A3(1)).create().icursor(;B3.contain(nid),nid_idx) | |
5 | =file(A3(2)).create().icursor(;B3.contain(nid),nid_idx) | |
6 | return B4|B5 | |
7 | =A3.conj() | |
8 | =file("result.txt").export@t(A8) |
这里需要注意 A7 中写法,因为 B6 中返回 B4|B5,所以导致 A3 的结果为多个游标序列的序列,因此在对 A3 进行纵向连接时,应该使用序列的 conj,而不是游标的 conjx。
至此,基于本文的 6 个集算器脚本文件,在第三方定时任务调度工具的合理调用下,可以实现单机情况下大数据量数据的追加,以及针对批量随机键值的查询工作。