Updated Oct 10: pread_file/5 should open a new FIle process each cycle
Updated Oct 05: Wrote a new version, which is more parallelized-likeness.
Updated Sep 27: Per Gustafsson gave a better solution, it took 5.155 seconds on my computer, with all record maching tasks done. And, get all lines ready without record matching only took 1.58 seconds.
Updated Sep 27: The best result on my computer is 5.188 seconds, after added -compile([native])., with: tbray:start(20, "o1000k.ap").
Tim's Wide Finder Project tried Erlang on large log file (around 200M):
Where, the Erlang code took more than 30 seconds to fetch the matched records.
I'm with a lot of interesting on Erlang's efficiency to process large dataset, so I tried several testing, and got some conclutions.
First, the file io itself in Erlang is reasonable fast, to read a 200M file into memory, using file:read_file/1 took less than 800ms on my computer.
But, to process the dataset, you can not avoid to travel the whole dataset, to find newline marks, to match the express etc.
I wrote a piece of code to travel the whole dataset in binary form, as simple as just count it byte by byte. Well, it took about 35s for a 200M file, seems travel a binary byte by byte is the major cost.
Thomas gave some hints on how to travel a binary efficiently, here.
And Bjorn from OTP term gave some hints too.
Yes, the most efficient data struct in Erlang to travel dataset byte by byte is List.
Let's take some testing:
travel_bin(Bin) -> travel_bin(Bin, 0).
travel_bin(<<>>, ByteCount) -> ByteCount;
travel_bin(<<$\n, Rest/binary>>, ByteCount) ->
travel_bin(Rest, ByteCount + 1);
travel_bin(<<_C, Rest/binary>>, ByteCount) ->
travel_bin(Rest, ByteCount + 1).
travel_list(List) -> travel_list(List, 0).
travel_list([], CharCount) -> CharCount;
travel_list([$\n|Rest], CharCount) ->
travel_list(Rest, CharCount + 1);
travel_list([_C|Rest], CharCount) ->
travel_list(Rest, CharCount + 1).
When apply to a 20M file, we got:
> {ok, Bin} = file:read_file("o100k.ap").
{ok,<<"host-24-225-218-245.patmedia.net - - [01/Oct/2006:06:33:45 -0700] \"GET /ongoing/ongoing.atom HTTP/1.1\" 304 -"...>>}
> timer:tc(tbray, travel_bin, [Bin]).
{2787402,20099550}
> timer:tc(tbray, travel_list, [binary_to_list(Bin)]).
{370906,20099550}
(Updated Oct 7: The statements about travel_list below are not quite accurate, the test of travel_list actually did not include the time taken by binary_to_list. The story of "Binary vs List in Erlang" is bit complex, the results vary depending on data size a lot. I'll post another article talking about it)
Where, travel_bin took about 2787.402ms, and travel_list took about 370.906ms (including the time costed to apply binary_to_list).
Pretty good result for travel_list, which was about 13% time costed comparing to travel_bin.
But, List is memory eater than Binary. Yes, when you try to apply above code to a file with 200M size, scene changes a lot:
> f(Bin).
ok
> {ok, Bin} = file:read_file("o1000k.ap").
{ok,<<"host-24-225-218-245.patmedia.net - - [01/Oct/2006:06:33:45 -0700] \"GET /ongoing/ongoing.atom HTTP/1.1\" 304 -"...>>}
> timer:tc(tbray, travel_bin, [Bin]).
{35414374,200995500}
> timer:tc(tbray, travel_list, [binary_to_list(Bin)]).
beam.smp(25965,0x1806400) malloc: *** vm_allocate(size=1782579200) failed (error code=3) ...
Where, size of o1000k.ap is about 200M. travel_bin took 35s, travel_list crashed.
How about split large binary to pieces, then convert them to lists, and travel them?
I tried, and, it's a bit trick. The trick is the buffer size. At first, I split binary to pieces of 1024 * 1024 size, the performance was even worse. I almost dropped. But, I tried more, when I adjusted the buffer size to 4096, this solution shines.
And finally, with a parallel file reader, I got an answer to Tim's exercise, plus a simple express matching, for an 1 million lines file (200M size), is 8.311 seconds when -smp enable, and 10.206 seconds when smp disabled.
My computer is a 2.0G 2-core MacBook, I'd like a see a result on more-core machine :-)
The code:
-module(tbray).
-compile([native]).
-export([start/2,
collect_loop/2,
buffered_read/3]).
-include_lib("kernel/include/file.hrl").
%% The best BUFFER_SIZE is 4096
-define(BUFFER_SIZE, 4096).
-record(context, {lineBuf = [],
matched = 0,
total = 0,
lastProcessedSeq = 0,
dataBuf = [],
processNum}).
%% erl -smp enable +P 60000
%% timer:tc(wide, start, [1000, "o1000k.ap"]).
start(ProcessNum, FileName) ->
statistics(wall_clock),
{ok, FileInfo} = file:read_file_info(FileName),
Size = FileInfo#file_info.size,
Collect = spawn(?MODULE, collect_loop, [self(), #context{processNum = ProcessNum}]),
psplit_read_file(Collect, FileName, Size div ProcessNum, ProcessNum, 1),
{Matched, Total} =
receive
#context{matched=MatchedX, total=TotalX} -> {MatchedX, TotalX}
end,
{_, Duration2} = statistics(wall_clock),
io:format("scan lines:\t ~pms~nMatched: ~B, Total: ~B~n", [Duration2, Matched, Total]).
psplit_read_file(_Collector, _FileName, _ChunkSize, ProcessNum, I) when I > ProcessNum -> done;
psplit_read_file(Collector, FileName, ChunkSize, ProcessNum, I) ->
spawn(
fun () ->
Offset = ChunkSize * (I - 1),
%% if it's last chuck, read all bytes left, which will not exceed ChunkSize * 2
Length = if I == ProcessNum -> ChunkSize * 2;
true -> ChunkSize
end,
{ok, File} = file:open(FileName, [read, binary]),
{ok, Data} = file:pread(File, Offset, Length),
Collector ! {I, Data}
end),
psplit_read_file(Collector, FileName, ChunkSize, ProcessNum, I + 1).
collect_loop(Pid, #context{lastProcessedSeq= ProcessNum,
processNum=ProcessNum}=Context) -> Pid ! Context;
collect_loop(Pid, #context{dataBuf=DataBuf}=Context) ->
receive
{Seq, Data} ->
SortedDatas = lists:keysort(1, [{Seq, Data} | DataBuf]),
Context1 = process_arrived_datas(SortedDatas, Context#context{dataBuf = []}),
%io:format("Last processed Seq: ~B~n", [Context1#context.lastProcessedSeq]),
collect_loop(Pid, Context1)
end.
process_arrived_datas([], Context) -> Context;
process_arrived_datas([{Seq, Data}|T], #context{lineBuf=LineBuf,
matched=Matched,
total=Total,
lastProcessedSeq=LastProcessedSeq,
dataBuf=DataBuf}=Context) ->
if Seq == LastProcessedSeq + 1 ->
{LineBuf1, Matched1, Total1} = buffered_read(
fun (Buffer, {LineBufX, MatchedX, TotalX}) ->
scan_line(binary_to_list(Buffer), LineBufX, MatchedX, TotalX)
end, {LineBuf, Matched, Total}, Data),
process_arrived_datas(T, Context#context{lineBuf = LineBuf1,
matched = Matched1,
total = Total1,
lastProcessedSeq = Seq});
true ->
process_arrived_datas(T, Context#context{dataBuf = [{Seq, Data} | DataBuf]})
end.
buffered_read(Fun, Acc, Bin) ->
case Bin of
<<Buf:?BUFFER_SIZE/binary, Rest/binary>> ->
Acc1 = Fun(Buf, Acc),
buffered_read(Fun, Acc1, Rest);
_ ->
Fun(Bin, Acc)
end.
scan_line([], LineBuf, Matched, Total) -> {LineBuf, Matched, Total};
scan_line([$\n|Rest], LineBuf, Matched, Total) ->
Line1 = lists:reverse(LineBuf),
%io:format("~n~s~n", [Line1]),
Matched1 = Matched + process_match(Line1),
scan_line(Rest, [], Matched1, Total + 1);
scan_line([C|Rest], LineBuf, Matched, Total) ->
scan_line(Rest, [C | LineBuf], Matched, Total).
process_match([]) -> 0;
process_match("GET /ongoing/When/"++Rest) ->
case match_until_space(Rest, false) of
true -> 0;
false -> 1
end;
process_match([_H|Rest]) ->
process_match(Rest).
match_until_space([$\040|_Rest], Bool) -> Bool;
match_until_space([$.|_Rest], _Bool) -> true;
match_until_space([_H|Rest], Bool) ->
match_until_space(Rest, Bool).
Some hints:
The solution spawns a lot of processes to read the file to binary in parallel. Then send them to a collect_loop, collect_loop will buffered_read each chunk (when chunks order is correct), buffered_read then converts each binary to small (4096 bytes here) lists, the scan_line will merge them to lines, and process_match on line.
As I mentioned before, handle a short string line in Erlang is fast, so I do not fork process_match to processes.
The code can handle very large files.
The matching code may not be correct, and does not finish all tasks that Tim wants.
发表评论
最近加入圈子
最新评论
-
ErlyBird 0.16.0 Released
Others->Erlang...
-- by dcaoyuan -
ErlyBird 0.16.0 Released
为什么我在NB内导入完成后,在New Project里面无法看到 create ...
-- by fatdong -
新的Scala for NetBeans提 ...
因为NetBeans的几个基础模块在Trunk里有与6.1不兼容的的变化,所以现 ...
-- by dcaoyuan -
新的Scala for NetBeans提 ...
另外,dcaoyuan大叔有空时能否简单说一下为NetBeans开发某一门语言的 ...
-- by 自言200801 -
新的Scala for NetBeans提 ...
重新更新以后就装不起来啦,错误如下:====================== ...
-- by 自言200801







评论排行榜