现在我想在所有可能的ai和bj对上计算一些函数f,其中每个结果f [ai,bj]再次是稀疏数组.顺便提一下,所有这些稀疏阵列都具有相同的尺寸.
而
Flatten[Outer[f,{a1,...},{b1,1],1]
返回所需的结果(原则上)它似乎消耗了过多的内存.并非最不重要,因为返回值是稀疏数组的列表,而一个全面的稀疏数组在我感兴趣的情况下效率更高.
是否有一种有效的替代上述外部使用?
更具体的例子:
{SparseArray[{{1,1,1} -> 1,{2,2,2} -> 1}],SparseArray[{{1,2} -> 1,1} -> 1}],2} -> -1,1} -> -1,1} -> 1}]}; ByteCount[%] list = SparseArray[%%] ByteCount[%] Flatten[Outer[Dot,list,1]; ByteCount[%] list1x2 = SparseArray[%%] ByteCount[%] Flatten[Outer[Dot,list1x2,1]; ByteCount[%] list1x3 = SparseArray[%%] ByteCount[%]
不仅外部(稀疏数组列表)的原始中间结果非常低效,外部似乎也在计算过程中消耗了太多的内存.
解决方法
稀疏数组构造/解构API
这是代码.首先,略微修改(以解决高维稀疏数组)稀疏数组构造 – 解构API,取自this answer:
ClearAll[spart,getIC,getJR,getSparseData,getDefaultElement,makeSparseArray]; HoldPattern[spart[SparseArray[s___],p_]] := {s}[[p]]; getIC[s_SparseArray] := spart[s,4][[2,1]]; getJR[s_SparseArray] := spart[s,2]]; getSparseData[s_SparseArray] := spart[s,4][[3]]; getDefaultElement[s_SparseArray] := spart[s,3]; makeSparseArray[dims_List,jc_List,ir_List,data_List,defElem_: 0] := SparseArray @@ {Automatic,dims,defElem,{1,{jc,ir},data}};
迭代器
ClearAll[makeTwoListIterator]; makeTwoListIterator[fname_Symbol,a_List,b_List] := With[{indices = Flatten[Outer[List,a,b,1]},With[{len = Length[indices]},Module[{i = 0},ClearAll[fname]; fname[] := With[{ind = ++i},indices[[ind]] /; ind <= len]; fname[] := Null; fname[n_] := With[{ind = i + 1},i += n; indices[[ind ;; Min[len,ind + n - 1]]] /; ind <= len]; fname[n_] := Null; ]]];
请注意,我可以实现上述功能更多的内存 – 有效而不使用外部,但对于我们的目的,这不是主要关注点.
这是一个更专业的版本,它为两维索引对生成交互器.
ClearAll[make2DIndexInterator]; make2DIndexInterator[fname_Symbol,i : {iStart_,iEnd_},j : {jStart_,jEnd_}] := makeTwoListIterator[fname,Range @@ i,Range @@ j]; make2DIndexInterator[fname_Symbol,ilen_Integer,jlen_Integer] := make2DIndexInterator[fname,ilen},jlen}];
这是如何工作的:
In[14]:= makeTwoListIterator[next,{a,c},{d,e}]; next[] next[] next[] Out[15]= {a,d} Out[16]= {a,e} Out[17]= {b,d}
我们也可以使用它来获得批处理结果:
In[18]:= makeTwoListIterator[next,e}]; next[2] next[2] Out[19]= {{a,d},e}} Out[20]= {{b,{b,e}}
,我们将使用第二种形式.
SparseArray – 构建函数
此函数将通过获取数据块(也以SparseArray形式)并将它们粘合在一起来迭代地构建SparseArray对象.它基本上是在this答案中使用的代码,打包成一个函数.它接受用于生成下一个数据块的代码段,包含在Hold中(我可以选择使其成为HoldAll)
Clear[accumulateSparseArray]; accumulateSparseArray[Hold[getDataChunkCode_]] := Module[{start,ic,jr,sparseData,dataChunk},start = getDataChunkCode; ic = getIC[start]; jr = getJR[start]; sparseData = getSparseData[start]; dims = Dimensions[start]; While[True,dataChunk = getDataChunkCode; If[dataChunk === {},Break[]]; ic = Join[ic,Rest@getIC[dataChunk] + Last@ic]; jr = Join[jr,getJR[dataChunk]]; sparseData = Join[sparseData,getSparseData[dataChunk]]; dims[[1]] += First[Dimensions[dataChunk]]; ]; makeSparseArray[dims,sparseData]];
把它们放在一起
ClearAll[sparseArrayOuter]; sparseArrayOuter[f_,a_SparseArray,b_SparseArray,chunkSize_: 100] := Module[{next,wrapperF,getDataChunkCode},make2DIndexInterator[next,Length@a,Length@b]; wrapperF[x_List,y_List] := SparseArray[f @@@ Transpose[{x,y}]]; getDataChunkCode := With[{inds = next[chunkSize]},If[inds === Null,Return[{}]]; wrapperF[a[[#]] & /@ inds[[All,1]],b[[#]] & /@ inds[[All,-1]]] ]; accumulateSparseArray[Hold[getDataChunkCode]] ];
在这里,我们首先生成迭代器,它将按需提供索引对列表的部分,用于提取元素(也是SparseArrays).请注意,我们通常一次从两个大输入SparseArray-s中提取多对元素,以加快代码速度.我们一次处理多少对由可选的chunkSize参数控制,默认为100.我们然后构造代码来处理这些元素并将结果放回SparseArray,我们使用辅助函数wrapperF.迭代器的使用并非绝对必要(可以使用Reap-Sow,与其他答案一样),但允许我将迭代逻辑与稀疏数组的泛型累积逻辑分离.
基准
首先,我们准备大型稀疏数组并测试我们的功能:
In[49]:= arr = {SparseArray[{{1,1}->1,2}->1}],2}->1,1}->1}],2}->-1,1}->1}]}; In[50]:= list=SparseArray[arr] Out[50]= SparseArray[<12>,{6,2}] In[51]:= larger = sparseArrayOuter[Dot,list] Out[51]= SparseArray[<72>,{36,2}] In[52]:= (large= sparseArrayOuter[Dot,larger,larger])//Timing Out[52]= {0.047,SparseArray[<2592>,{1296,2}]} In[53]:= SparseArray[Flatten[Outer[Dot,1]]==large Out[53]= True In[54]:= MaxMemoryUsed[] Out[54]= 21347336
现在我们进行功率测试
In[55]:= (huge= sparseArrayOuter[Dot,large,2000])//Timing Out[55]= {114.344,SparseArray[<3359232>,{1679616,2}]} In[56]:= MaxMemoryUsed[] Out[56]= 536941120 In[57]:= ByteCount[huge] Out[57]= 262021120 In[58]:= (huge1 = Flatten[Outer[Dot,1]);//Timing Out[58]= {8.687,Null} In[59]:= MaxMemoryUsed[] Out[59]= 2527281392
对于这个特定的例子,建议的方法比直接使用Outer的内存效率高5倍,但速度慢约15倍.我不得不调整chunksize参数(默认为100,但对于上面我使用2000,以获得最佳速度/内存使用组合).我的方法仅用作存储最终结果所需内存的两倍的峰值.与基于外部的方法相比,内存节省程度将取决于所讨论的稀疏阵列.