前几天在书上看到 quicksort 算法, 于是到 rosettacode 上找了一下 erlang 的实现.
原理
1 +--------------------------------------------+
2 +------------------+ +----------------------+
3 +--------+ +-------+ +--------+ +-----------+
首先找到一个参照值(pivot), 再把所有元素分为(<pivot)和(>=pivot)两部分, 再重复此步骤直到排序完成.
- 单进程版本
-module( quicksort ).
-export( [qsort/1] ).
qsort([]) -> [];
qsort([X|Xs]) ->
qsort([ Y || Y <- Xs, Y < X]) ++ [X] ++ qsort([ Y || Y <- Xs, Y >= X]).
- 多进程版本
quick_sort(L) -> qs(L, trunc(math:log2(erlang:system_info(schedulers)))).
qs([],_) -> [];
qs([H|T], N) when N > 0 ->
{Parent, Ref} = {self(), make_ref()},
spawn(fun()-> Parent ! {l1, Ref, qs([E||E<-T, E<H], N-1)} end),
spawn(fun()-> Parent ! {l2, Ref, qs([E||E<-T, H =< E], N-1)} end),
{L1, L2} = receive_results(Ref, undefined, undefined),
L1 ++ [H] ++ L2;
qs([H|T],_) ->
qs([E||E<-T, E<H],0) ++ [H] ++ qs([E||E<-T, H =< E],0).
receive_results(Ref, L1, L2) ->
receive
{l1, Ref, L1R} when L2 == undefined -> receive_results(Ref, L1R, L2);
{l2, Ref, L2R} when L1 == undefined -> receive_results(Ref, L1, L2R);
{l1, Ref, L1R} -> {L1R, L2};
{l2, Ref, L2R} -> {L1, L2R}
after 5000 -> receive_results(Ref, L1, L2)
end.
rosetta 上的多进程版本有点小bug, 顺手修复了:)
测试
测试数据使用最常用的1w个英语单词. 机器是8核i7处理器.
prepare_data(N) ->
{ok, Binary} = file:read_file("google-10000-english.txt"),
binary:split(Binary, <<"\n">>, [global, trim]).
normal_sort(List) ->
timer:tc(fun qsort/1, [List]).
multi_sort(List) ->
timer:tc(fun quick_sort/1, [List]).
结果
单位: microseconds
单进程: 17730, 17809, 18402
多进程: 15605, 18645, 12845