拒绝访问当编写一个使用抽样S3 bucket

学习如何解决一个拒绝访问错误当编写一个使用抽样S3 bucket。

写的亚当Pavlacka

去年发表在:2022年5月31日

问题

写信给一个S3 bucket使用抽样失败。司机节点可以写,但工人(执行者)节点返回一个拒绝访问错误。写作与DataFrame API,但是效果很好。

例如,假设你运行以下代码:

% scala . io .进口java文件导入。可序列化的import org.apache.spark.{SparkConf, SparkContext} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.conf.Configuration import java.net.URI import scala.collection.mutable import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.InputDStream val ssc = new StreamingContext(sc, Seconds(10)) val rdd1 = sc.parallelize(Seq(1,2)) val rdd2 = sc.parallelize(Seq(3,4)) val inputStream = ssc.queueStream[Int](mutable.Queue(rdd1,rdd2)) val result = inputStream.map(x => x*x) val count = result.foreachRDD { rdd => val config = new Configuration(sc.hadoopConfiguration) with Serializable rdd.mapPartitions { _.map { entry => val fs = FileSystem.get(URI.create("s3://dx.lz.company.fldr.dev/part_0000000-123"), config) val path = new Path("s3://dx.lz.company.fldr.dev/part_0000000-123") val file = fs.create(path) file.write("foobar".getBytes) file.close() } }.count() } println(s"Count is $count") ssc.start()

返回下列错误:

org.apache.spark。SparkException:工作阶段失败而终止:任务3阶段0.0失败了4次,最近的失败:在舞台上失去了任务3.3 0.0 (TID 7 10.205.244.228执行人0):java.rmi。RemoteException异常:com.amazonaws.services.s3.model。AmazonS3Exception:拒绝访问;请求ID: F81ADFACBCDFE626,扩展请求ID: 1 dncbuhsmuffi9a1lz0ygt4dnrjdy5v3c + J / DiEeg8Z4tMOLphZwW2U + sdxmr8fluQZ1R / 3 bcep,

导致

当您编写使用抽样工作节点,我政策拒绝访问如果你使用可序列化的,如val配置=新配置(sc.hadoopConfiguration)可序列化的

解决方案

有两种方法可以解决这个问题:

选项1:使用DataFrames

% scala dbutils.fs.put (“s3a: / / dx.lz.company.fldr.dev / test-gopi / test0.txt”、“foobar”) val df = spark.read.text (“s3a: / / dx.lz.company.fldr.dev / test-gopi / test0.txt”) df.write.text (“s3a: / / dx.lz.company.fldr.dev / test-gopi / text1.txt”) val df1 = spark.read.text (“s3a: / / dx.lz.company.fldr.dev / test-gopi / text1.txt”)

选项2:使用SerializableConfiguration

如果你想使用抽样,使用:

% scala val配置= sc.broadcast(新SerializableConfiguration (sc.hadoopConfiguration))

例如:

% scala . io .进口java文件导入。可序列化的import org.apache.spark.{SparkConf, SparkContext} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.conf.Configuration import java.net.URI import scala.collection.mutable import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.util.SerializableConfiguration val ssc = new StreamingContext(sc, Seconds(10)) val rdd1 = sc.parallelize(Seq(1,2)) val rdd2 = sc.parallelize(Seq(3,4)) val inputStream = ssc.queueStream[Int](mutable.Queue(rdd1,rdd2)) val result = inputStream.map(x => x*x) val count = result.foreachRDD { rdd => //val config = new Configuration(sc.hadoopConfiguration) with Serializable val config = sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration)) rdd.mapPartitions { _.map { entry => val fs = FileSystem.get(URI.create("s3://pathpart_0000000-123"), config.value.value) val path = new Path("s3:///path/part_0000000-123") val file = fs.create(path) file.write("foobar".getBytes) file.close() } }.count() } println(s"Count is $count") ssc.start()


这篇文章有用吗?