0
$\begingroup$

I need to improve the speed of the following algorithm:

Take a list $x={x_1,...x_n}$. For each integer $i=1,..n$ construct the following list assuming $z=x_i$ if $i=1$ and $z=x_{i-1}$ otherwise:

$X_i=\{x_{|-n|+i},...,x_{|-1|+i},x_i,z,x_i,x_{1+i},...,x_{n-i}\}$

For each $X_i$ take the average of two neighbor points to construct

$Y_i=\{\frac{1}{2}(x_{|-n|+i}+x_{|-n|+i-1}),...,\frac{1}{2}(x_i+z),\frac{1}{2}(z+x_i),...,\frac{1}{2}(x_{n-1-i}+x_{n-i})\}$

My current solution looks like this:

averageGridPoints[{a_, b_}] := (a + b)/2
constructRay[i_Integer, list_List] := 
     Module[{nrd = Length[list] - 1}, 
       Join[
            Table[list[[Abs[j] + i]], {j, -nrd + i - 1, -1}],
            If[i == 1, 
              {list[[i]], list[[i]], list[[i]]}, 
              {list[[0 + i]],list[[0 + i - 1]], list[[1 + i - 1]]}
              ],
            Table[list[[Abs[j] + i]], {j, 1, nrd - i + 1}]
           ]]
constructAverageRay[i_Integer, list_List] := averageGridPoints /@ Partition[constructRay[i, list], 2, 1]
constructAverageRay[i_List, list_List] := Map[(averageGridPoints /@ Partition[constructRay[#, list], 2, 1]) &, i]

I use the compiled version of constructRay to speed things up:

constructRayC = Compile[{{i, _Integer}, {list, _Real, 1}}, 
    Module[{nrd = Length[list] - 1}, Join[
      Table[list[[Abs[j] + i]], {j, -nrd + i - 1, -1}],
      If[i == 1, 
      {list[[i]], list[[i]], list[[i]]}, 
      {list[[0 + i]],list[[0 + i - 1]], list[[1 + i - 1]]}],
      Table[list[[Abs[j] + i]], {j, 1, nrd - i + 1}]]], 
      CompilationTarget -> "C", RuntimeOptions -> "Speed",RuntimeAttributes -> {Listable}
      ]
constructAverageRayC[i_List, list_List] :=  Map[(averageGridPoints /@ Partition[constructRayC[#, list], 2, 1]) &, i]

The resulting list for $i=1$ looks like this:

lst = Table[Subscript[x, i], {i, 1, 10}]

enter image description here

constructRay[1, lst]

enter image description here

constructRay[4, lst]

enter image description here

I just noticed that I can speed up everything by using PackedArray but I wonder if there is still some room for performance improvements. Here are my timings for a single run:

Needs["Developer`"]
n = 1000;
list0 = Range[n];
SeedRandom[123];
list1 = RandomReal[{0, 1}, n];
list1p = ToPackedArray[list1];

AbsoluteTiming[res1 = constructRay[1, list1];]
AbsoluteTiming[res2 = constructRayC[1, list1];]

AbsoluteTiming[res3 = constructRay[1, list1p];]
AbsoluteTiming[res4 = constructRayC[1, list1p];]

(* {0.0005527, Null}
   {0.000146, Null}
   {0.000465, Null}
   {0.000027, Null} *)

res1 == res2 == res3 == res4

(* True  *)

Unfortunately PackedArray does not speed up things if applied to all lists:

AbsoluteTiming[resc = constructAverageRayC[list0, list1];]
AbsoluteTiming[resp = constructAverageRayC[list0, list1p];]

(* {1.04148, Null}
   {1.0311, Null}
   True *)

I can speed up the routine a little by reversing the first part instead of recreating it but the effect is small:

constructRay2[i_Integer, list_List] := Module[
     {nrd = Length[list] - 1, indx, lst},
     indx = Table[Abs[j] + i, {j, -nrd + i - 1, -1}];
     lst = list[[indx]];
     Join[
         list[[indx]],
         If[i == 1, 
         {list[[i]], list[[i]], list[[i]]}, 
         {list[[0 + i]],list[[0 + i - 1]], list[[1 + i - 1]]}],
         Reverse[lst]
         ]]

I have to create these arrays a lot and later have to integrate over them, so any little performance boost will add up.

Any further help is really appreciated. For the interested parties: the context of this is ray tracing in spherical geometries.

EDIT

Following the comments I will show the full workflow in order to assess where performance could be gained. Apologies for the long post but maybe it is interesting to some.

To give some context. the $x_i$ are quantities along the radius of a sphere with corresponding radii $r={r_1,...,r_N}$. The goal is to perform at any $r_i$ the angular average (over half solid angle actually) of the integral of $X$ to the sphere's surface. The $Y_1$ is the central line of sight, the other $Y_i$ are parallel l.o.s. with increasing impact parameter.

As additional ingredient we need the information on the distance between the elements of each $Y_i$. I store this information in $L$ which is a $N\times 2N$ array. We also need an array with the angle information.

An example workflow looks like this (with timings):

n = 1000;
SeedRandom[123];
rden = Sort@RandomReal[{0, 1}, n];
pop = RandomReal[{0, 1}, n];

AbsoluteTiming[
 gL = geodat[rden];
 L = constructRayLengthC[Range[1, Length[rden]], gL];]
(* {1.69037, Null} *)

AbsoluteTiming[
 angles = ConstructAngleArray[rden];]
(* {1.00136, Null} *)

The above is the same for all computations. Below has to be done for each array pop.

AbsoluteTiming[
 fai = constructFAIarrayC[pop, L];]
(* {1.15699, Null} *)

AbsoluteTiming[
 faish = constructFAISHarrayC[fai];]
(* {0.0857162, Null} *)

AbsoluteTiming[
 int = angleIntC[faish, rden, angles];]
(* {0.0430777, Null} *)

int contains the angular average (over 2$\pi$) of the pop array at each position of rden. Given the timings I concluded that constructing FAI has the highest potential for saving CPU time.

Here is the code:

constructFAIarrayC[list_, lengthArray_] := Transpose[MapIndexed[ArrayPad[#1, {First@#2, First@#2 - 1}] &, 
Accumulate /@ (constructAverageRayC[Range[Length[list]], list]*
  lengthArray)]]
constructFAISHarrayC =  Compile[{{fai, _Real, 2}}, Module[{dim = Dimensions[fai], nrd, faish},nrd = dim[[2]];
faish = ConstantArray[0., {2 nrd + 1, nrd}];
faish[[0 + nrd + 1, 1]] = fai[[0 + nrd + 1, 1 + 1]];
faish[[-1 + nrd + 1, 1]] = fai[[-1 + nrd + 1, 1]];
faish[[1 + nrd + 1, 1]] = fai[[1 + nrd + 1, 1]];
Do[
 If[i <= nrd - 1,
  faish[[0 + nrd + 1, i]] = fai[[0 + nrd + 1, i + 1]]];
 Do[
  (* Intensities for lower half (90<theta<180) *)
  faish[[i - j + 1 + nrd + 1, i]] = fai[[i - j + 1 + nrd + 1, j]]
  , {j, 1, i}];
 Do[
  (* Intensities for upper half (0<theta<90)*)
  faish[[-i + j - 1 + nrd + 1, i]] = fai[[-i + j - 1 + nrd + 1, j]];
  , {j, 1, i}];
 , {i, nrd, 2, -1}];
faish]]
constructRayLengthC = Compile[{{i, _Integer}, {list, _Real, 2}}, 
  Module[{nrd = Length[list]}, Join[
  Table[list[[Abs[j] + i, i]], {j, -nrd + i, -1}],
    {list[[i, i]], list[[i, i]]},
  Table[list[[Abs[j] + i - 1, i]], {j, 2, nrd - i + 1}]]], 
  CompilationTarget -> "C", RuntimeOptions -> "Speed", 
  RuntimeAttributes -> {Listable}]

geodat[r_List] := Module[
   {nr = Length[r](*-1*), l, lsum, parsecCM = 3.086`*^18},
  (* r[[1]] starts from the center*)
   l = ConstantArray[0, {nr(*+1*), nr(*+1*)}];
   l[[1, 1]] = parsecCM*r[[1]];
   Table[l[[i, 1]] = (r[[i]] - r[[i - 1]]), {i, 2, nr}];
   Table[lsum = 0;
     Table[l[[i, j]] = parsecCM*Sqrt[r[[i]]^2 - r[[j - 1]]^2] - lsum;        lsum = l[[i, j]] + lsum;
      , {i, j, nr}]
    , {j, 2, nr}];
   l]
ConstructAngleArray[rden_List] /; Depth[rden] == 2 := Module[{r = rden, anglePerRay}, 
  (* rden is by one shorter than the hdf  lists *)
  anglePerRay = ConstantArray[0., {Length[r] + 1, Length[r]}];
  Table[
    anglePerRay[[0 + 1, i]] = \[Pi]/2;
    If[i > 1, 
     Table[anglePerRay[[j + 1, i]] = ArcSin[r[[i - j]]/r[[i]]], {j, 1, i - 1}]], {i, 1, Length[r]}];
   anglePerRay]

angleIntC = Compile[{{faish, _Real, 2}, {r, _Real, 1}, {AnglePerRay, _Real, 2}}, Module[{nrd, LI, len, SUMME},
 nrd = Dimensions[faish][[2]];
 len = nrd + 1;
 LI = ConstantArray[0., nrd];
 LI[[1]] = faish[[0 + len, 1]]/2;
 LI[[2]] = 0.5*(\[Pi]/2. - AnglePerRay[[1 + 1, 2]])*(faish[[-1 + len, 2]] + faish[[1 + len, 2]])*(2./3. + 1./3.*r[[1]]/r[[2]]);
 Do[
  (* intervall J=[0,1]:special integration rule *)
  (* length of the intervall *)
   SUMME = 2.0*(\[Pi]/2 - AnglePerRay[[1 + 1, i]])*(faish[[-1 + len, i]] + faish[[1 + len, i]])*(2./3. + 1./3.*r[[i - 1]]/r[[i]]);
  (*  intermediate intervalls:compound trapezoidal rule*)
   Do[
     SUMME = SUMME + (AnglePerRay[[j - 1 + 1, i]] - 
      AnglePerRay[[j + 1, i]])*(r[[i - j]]/
      r[[i]])*(faish[[-j + len, i]] + faish[[j + len, i]]) +
      (r[[i - j + 1]]/r[[i]])*(faish[[-j + 1 + len, i]] + 
      faish[[j - 1 + len, i]])
    , {j, 2, i - 1}];
  (* intervall J=[I-1,I] *)
  SUMME = SUMME + AnglePerRay[[i - 1 + 1, 
     i]]*((r[[1]]/r[[i]])*(faish[[-i + 1 + len, i]] + 
       faish[[i - 1 + len, i]]));
  LI[[i]] = 0.25*SUMME;
   , {i, 3, nrd}];
  LI], 
 CompilationTarget -> "C", RuntimeOptions -> "Speed", RuntimeAttributes -> {Listable}]
$\endgroup$
6
  • 1
    $\begingroup$ Since memory is the slowest hardware component involved in these computations, you should not generate big arrays that you can generate more quickly on the fly. "I have to create these arrays a lot and later have to integrate over them, [...]" Then it is probably a good idea to not generate these arrays in the first place, but to immediately integrate/sum over them. You can implement it by adding to just one scalar variable, so you do not have to shove around that much memory. $\endgroup$ Commented Feb 15, 2022 at 11:28
  • 1
    $\begingroup$ Moreover, the lists $X_i$ and $Y_i$ are very dedundant; you can generate the first half of the list on the fly from just the sond half. So at a minimum, you should avoid storing the first half. Even better to not generate $X_i$ in the first place. The $x_1,\dotsc,x_n$ probably fit into cache. So repeatedly accessing them should be much less expensive thanforming the lists $X_i$ and $Y_i$ that probably won't fit into cache and have to be loaded from lower levels in the memory hierarchy. $\endgroup$ Commented Feb 15, 2022 at 11:28
  • $\begingroup$ @HenrikSchumacher Unfortunately, the array of piecewise integrated $Y_i$ needs to be stored to create a new array by resorting the elements of $Y$, so just adding all to a scalar is not an option here. In my second solution I already reuse half of the list, but that is not what you meant by generating on the fly, is it? Maybe I should edit my post and add the next steps that I have to perform. Thanks anyways for the comments! $\endgroup$ Commented Feb 15, 2022 at 11:45
  • $\begingroup$ Yes, showing the next step might help, even if this makes the post even longer. $\endgroup$ Commented Feb 15, 2022 at 11:50
  • $\begingroup$ Okay, I tried very hard to understand the the logic behind your code for one hour now. And I failed. You prosa description of your problem also did not help for abundancy of field-specifi lingo like "over half solid angle actually", "central line of sight", "l.o.s.", "impact parameter", "FAI". Sorry, you simply lost me there. I am still conviced though, that you could achieve your goals with only a tiny fraction of your intermediately allocated memory (and thus much faster). $\endgroup$ Commented Feb 16, 2022 at 9:32

1 Answer 1

3
$\begingroup$

There is a simpler and faster way to the main objective:

(*ray*)
conRay[i_Integer, list_List] := Which[
     i == 1, Join[Reverse@list, {list[[1]]}, list],
     i != 1, Join[Reverse@Rest[#], #] &@Drop[list, i - 2]
]
(*compiled*)
conRayC = Compile[{{i, _Integer}, {list, _Real, 1}},
     Join[Reverse@Rest@#, #] &@Drop[Join[{list[[1]]}, list], i - 1],
CompilationTarget -> "C", RuntimeOptions -> "Speed", RuntimeAttributes -> {Listable}];
(*averaged list*)
conAvgRay[i_List, list_List] := Module[{a, n, x},
     a = Join[Reverse@#, #] &@MovingAverage[Join[{list[[1]]}, list], 2];
     n = Length@list;
     x = Table[Drop[a, n + 1 - (j - 1) ;; n + (j - 1)], {j, i}]
]

Comparison:

list = Table[Subscript[x, i], {i, 1, 10}];

Do[constructRay[1, list], 1000] // AbsoluteTiming
Do[conRay[1, list], 1000] // AbsoluteTiming

(* {0.039294, Null}
   {0.004374, Null} *)

AllTrue[
     Table[conRay[i, list] == constructRay[i, list], {i, Length@list}]
, # == True &]

(* True *)

which is about 9 times faster. For compiled functions:

n = 10000;
list0 = Range[n];
SeedRandom[123];
list1 = RandomReal[{0, 1}, n];

Do[constructRayC[1, list1], 1000] // AbsoluteTiming
Do[conRayC[1, list1], 1000] // AbsoluteTiming

(* {0.063863, Null}
   {0.038154, Null} *)

AllTrue[
     Table[conRayC[i, list1] == constructRayC[i, list1], {i, Length@list1}]
, # == True &]

(* True *)

roughly doubled the speed. The gap appears to get bigger as n goes bigger. For the averaged list with n=1000,

n = 1000;
list0 = Range[n];
SeedRandom[123];
list1 = RandomReal[{0, 1}, n];

Do[constructAverageRay[list0, list1], 10] // AbsoluteTiming
Do[conAvgRay[list0, list1], 10] // AbsoluteTiming

(* {20.4102, Null}
   {0.026772, Null} *)

conAvgRay[list0, list1] == constructAverageRay[list0, list1]

(* True *)

which is about 50 800 times faster.

$\endgroup$
2
  • $\begingroup$ Thanks! That is already an impressive speed up. I guess the idea is to always copy maximal large blocks of memory over instead of accessing arrray elemnts individually. Surprisingly, the compiled conRayC seems to be 4-5 times slower than the uncompiled version. Any idea why? $\endgroup$ Commented Feb 15, 2022 at 16:05
  • 1
    $\begingroup$ @MarkusRoellig It depends how you compiled the function. By the way, I improved the conAvgRay. $\endgroup$
    – Shin Kim
    Commented Feb 15, 2022 at 19:06

Not the answer you're looking for? Browse other questions tagged or ask your own question.