-
Notifications
You must be signed in to change notification settings - Fork 4
/
parallel-forkmanager-mestia.pl
100 lines (90 loc) · 2.74 KB
/
parallel-forkmanager-mestia.pl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
use warnings;
use strict;
use feature qw(say state);
use Parallel::ForkManager;
# code via /u/mestia: https://old.reddit.com/r/perl/comments/18ygpsi/1_billion_row_challenge_in_perl/kgibij8/
my $file = shift // die "Usage: $0 filename\n";
my $pm = Parallel::ForkManager->new(8);
open my $fh, '<', $file or die $!;
my $num_lines;
my @arr;
my $data = {};
my $uniqcity = {};
while (my $line = <$fh>) {
chomp $line;
push @arr, $line;
# fork every 31250000 lines proc_chunk()
if (++$num_lines >= 31250000) { #1000000000/32
$pm -> run_on_finish (
sub {
my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $datast) = @_;
update_global_hash($datast);
}
);
$pm->start and do {
$num_lines = 0;
@arr = ();
next;
};
my $ret = proc_chunk(\@arr);
$pm->finish(0,$ret);
}
}
$pm->wait_all_children;
if ( @arr ) {
my $rest = proc_chunk(\@arr);
update_global_hash($rest);
}
sub update_global_hash {
my ($datast) = @_;
for my $city (keys %{$datast}) {
my $max = $datast->{$city}->{max};
my $min = $datast->{$city}->{min};
my $sum = $datast->{$city}->{sum};
my $cnt = $datast->{$city}->{cnt};
if ($data->{$city}){
my $cd=$data->{$city};
if ($max>$cd->{max}){ # max
$cd->{max}=$max;
}
elsif ($min<$cd->{min}){ # min
$cd->{min}=$min;
}
$cd->{sum}+=$sum;
$cd->{cnt}+=$cnt;
}
else {
$data->{$city}=$datast->{$city}; #init
}
}
}
sub proc_chunk {
my $data = {};
for my $line (@{$_[0]}) {
my ($city,$temp)=split(';',$line); # get city and temperature
#$temp=~s/\.//; # remove decimal point
#$temp*=10; # remove decimal point
#use integer; # speeds up by 10%
if ($data->{$city}){
my $cd=$data->{$city}; # create a local copy to speed up access for calculations
if ($temp>$cd->{max}){ # max
$cd->{max}=$temp;
}
elsif ($temp<$cd->{min}){ # min
$cd->{min}=$temp;
}
$cd->{sum} +=$temp;
$cd->{cnt}++;
}
else {
$data->{$city}={max=>$temp,min=>$temp,sum=>$temp,cnt=>1} # initialise city
}
}
return $data;
}
print"{";
for (sort keys %$data){ # print results
my $cd=$data->{$_};
printf "%s=%.1f/%.1f/%.1f, ", $_,$cd->{min},$cd->{sum}/$cd->{cnt},$cd->{max};
}
say"}\n";