如何在函数中不使用 sparkContext 的 spark 函数中读取来自 HDFS 的文件。
例子:
val filedata_rdd = rdd.map { x => ReadFromHDFS(x.getFilePath) }
问题是如何实现 ReadFromHDFS?通常从 HDFS 读取我们可以做一个 sc.textFile 但在这种情况下 sc 不能在函数中使用。
最佳答案
您不一定需要服务上下文来与 HDFS 交互。您可以简单地从 master 广播 hadoop 配置,并在执行器上使用广播的配置值来构造一个 hadoop.fs.FileSystem
。那么世界就是你的。 :)
代码如下:
import java.io.StringWriter
import com.sachin.util.SparkIndexJobHelper._
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SerializableWritable, SparkConf}
class Test {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[15]")
.setAppName("TestJob")
val sc = createSparkContext(conf)
val confBroadcast = sc.broadcast(new SerializableWritable(sc.hadoopConfiguration))
val rdd: RDD[String] = ??? // your existing rdd
val filedata_rdd = rdd.map { x => readFromHDFS(confBroadcast.value.value, x) }
}
def readFromHDFS(configuration: Configuration, path: String): String = {
val fs: FileSystem = FileSystem.get(configuration)
val inputStream = fs.open(new Path(path));
val writer = new StringWriter();
IOUtils.copy(inputStream, writer, "UTF-8");
writer.toString();
}
}
https://stackoverflow.com/questions/40185692/