apache-spark - 从 Spark 转换函数中动态地从 HDFS 读取文件

如何在函数中不使用 sparkContext 的 spark 函数中读取来自 HDFS 的文件。

例子:

val filedata_rdd = rdd.map { x => ReadFromHDFS(x.getFilePath) }

问题是如何实现 ReadFromHDFS?通常从 HDFS 读取我们可以做一个 sc.textFile 但在这种情况下 sc 不能在函数中使用。

最佳答案

您不一定需要服务上下文来与 HDFS 交互。您可以简单地从 master 广播 hadoop 配置,并在执行器上使用广播的配置值来构造一个 hadoop.fs.FileSystem。那么世界就是你的。 :)

代码如下:

import java.io.StringWriter

import com.sachin.util.SparkIndexJobHelper._
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SerializableWritable, SparkConf}

class Test {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[15]")
      .setAppName("TestJob")
    val sc = createSparkContext(conf)

    val confBroadcast = sc.broadcast(new SerializableWritable(sc.hadoopConfiguration))

    val rdd: RDD[String] = ??? // your existing rdd
    val filedata_rdd = rdd.map { x => readFromHDFS(confBroadcast.value.value, x) }

  }

  def readFromHDFS(configuration: Configuration, path: String): String = {
    val fs: FileSystem = FileSystem.get(configuration)
    val inputStream = fs.open(new Path(path));

    val writer = new StringWriter();
    IOUtils.copy(inputStream, writer, "UTF-8");
    writer.toString();
  }

}

https://stackoverflow.com/questions/40185692/

相关文章:

indexing - 确定 Teradata 中表的主索引

php - 基于其他数组排序数组

cuda - 从未对齐的 uint8_t 重铸为 uint32_t 数组读取 - 未获取所有值

c# - 如何从 MVC razor c# 中的动态模型获取属性值

Python/Django - 需要一个字符串或类似字节的对象

vb.net - 为什么异步函数返回 System.Threading.Tasks.Task`1[S

json - R + fromJSON - 如何发送标题信息?

spring - 基于模型变量+Spring表单+JSP的选中单选按钮

php - 将变量从按钮传递到 Controller Laravel

xcode - 我找不到 podfile