python – 如何分区保存R函数的pyspark RDD

import rpy2.robjects as robjects

dffunc = sc.parallelize([(0,robjects.r.rnorm),(1,robjects.r.runif)])
dffunc.collect() 

输出

[(0, <rpy2.rinterface.SexpClosure - Python:0x7f2ecfc28618 / R:0x26abd18>), (1, <rpy2.rinterface.SexpClosure - Python:0x7f2ecfc283d8 / R:0x26aad28>)]

虽然分区版本会导致错误:

dffuncpart = dffunc.partitionBy(2)
dffuncpart.collect()
RuntimeError: ('R cannot evaluate code before being initialized.', <built-in function unserialize>

看起来这个错误是R没有加载到其中一个分区上,我认为这意味着没有执行第一个导入步骤.有没有办法解决?

编辑1第二个例子让我认为pyspark或rpy2的时间存在错误.

dffunc = sc.parallelize([(0,robjects.r.rnorm),     (1,robjects.r.runif)]).partitionBy(2)
def loadmodel(model):
    import rpy2.robjects as robjects
    return model[1](2)
dffunc.map(loadmodel).collect()

产生相同的错误R在初始化之前无法评估代码.

dffuncpickle = sc.parallelize([(0,pickle.dumps(robjects.r.rnorm)),(1,pickle.dumps(robjects.r.runif))]).partitionBy(2)
def loadmodelpickle(model):
    import rpy2.robjects as robjects
    import pickle
    return pickle.loads(model[1])(2)
dffuncpickle.map(loadmodelpickle).collect()

按预期工作.

最佳答案 我想说“这不是rpy2中的错误,这是一个功能”但我实际上必须解决“这是一个限制”.

发生的事情是rpy2有2 interface levels.一个是低级别的(更靠近R的C API)并且可以通过rpy2.rinterface获得,另一个是具有更多铃声和口哨声的高级接口,更多的是“pythonic” ,以及继承自rinterface level-one的R对象的类(最后一部分对于下面关于酸洗的部分很重要).如果需要,导入高级接口会导致使用默认参数初始化(启动)嵌入式R.导入低级接口rinterface没有这种副作用,并且必须显式执行嵌入式R的初始化(函数initr). rpy2是这样设计的,因为嵌入式R的初始化可以有参数:首先导入rpy2.rinterface,设置初始化,然后导入rpy2.robjects使这成为可能.

除此之外,rpy2包装的R对象的序列化(酸洗)目前仅在rinterface级别定义(参见documentation).酸洗robjects级(高级)rpy2对象正在使用rinterface级代码,当取消它们时,它们将保持在较低级别(Python pickle包含定义对象类的模块并将导入该模块 – 这里是rinterface,它并不意味着嵌入式R的初始化.事情就是这样的原因很简单,就是“现在已经足够好了”:在实现这个时,我不得不同时想到一种很好的方法来桥接两种不同的语言并通过Python C-API学习我的方式pickle / unpickling Python对象.考虑到人们可以轻松编写类似的东西

import rpy2.robjects

要么

import rpy2.rinterface
rpy2.rinterface.initr()

在去除之前,这从未被重新审视过.我所知道的rpy2酸洗的用途是使用Python的多处理(并且在初始化子进程的代码中添加类似于import语句的东西是一种廉价且充分的修复).愿这是再次审视这个问题的时候了.如果是这样,请提交rpy2的错误报告.

编辑:这无疑是rpy2的一个问题.腌制的robjects级别对象应该取消回到robjects级别,而不是rinterface级别.我已经打开了一个issue in the rpy2 tracker(并且已经在默认的/ dev分支中推送了一个基​​本的补丁).

第2次编辑:补丁是从版本2.7.7开始发布的rpy2的一部分(在撰写本文时的最新版本是2.7.8).

点赞