Skip to content

Enable -Xsource:3 compiler flag #50474

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private[spark] class TypedConfigBuilder[T](
import ConfigHelpers._

def this(parent: ConfigBuilder, converter: String => T) = {
this(parent, converter, { v: T => v.toString })
this(parent, converter, (v: T) => v.toString)
}

/** Apply a transformation to the user-provided values of the config entry. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ private[kafka010] class KafkaOffsetReaderAdmin(
// admin has a much bigger chance to hit KAFKA-7703 like issues.
var incorrectOffsets: Seq[(TopicPartition, Long, Long)] = Nil
var attempt = 0
do {
while ({
partitionOffsets = listOffsets(admin, listOffsetsParams)
attempt += 1

Expand All @@ -349,7 +349,8 @@ private[kafka010] class KafkaOffsetReaderAdmin(
Thread.sleep(offsetFetchAttemptIntervalMs)
}
}
} while (incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts)
incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts
}) ()

logDebug(s"Got latest offsets for partitions: $partitionOffsets")
partitionOffsets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ private[kafka010] class KafkaOffsetReaderConsumer(
// - Avoid calling `consumer.poll(0)` which may cause KAFKA-7703.
var incorrectOffsets: Seq[(TopicPartition, Long, Long)] = Nil
var attempt = 0
do {
while ({
consumer.seekToEnd(partitions)
partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
attempt += 1
Expand All @@ -399,7 +399,8 @@ private[kafka010] class KafkaOffsetReaderConsumer(
Thread.sleep(offsetFetchAttemptIntervalMs)
}
}
} while (incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts)
incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts
}) ()

logDebug(s"Got latest offsets for partition : $partitionOffsets")
partitionOffsets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private[consumer] class FetchedDataPool(
private def removeIdleFetchedData(): Unit = synchronized {
val now = clock.getTimeMillis()
val maxAllowedReleasedTimestamp = now - minEvictableIdleTimeMillis
cache.values.foreach { p: CachedFetchedDataList =>
cache.values.foreach { (p: CachedFetchedDataList) =>
val expired = p.filter { q =>
!q.inUse && q.lastReleasedTimestamp < maxAllowedReleasedTimestamp
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2362,7 +2362,7 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
AddKafkaData(Set(topic), 30, 31, 32), // Add data when stream is stopped
StartStream(),
CheckAnswer(-21, -22, -11, -12, 2, 12, 20, 21, 22, 30, 31, 32), // Should get the added data
AssertOnQuery("Add partitions") { query: StreamExecution =>
AssertOnQuery("Add partitions") { (query: StreamExecution) =>
if (addPartitions) setTopicPartitions(topic, 10, query)
true
},
Expand Down Expand Up @@ -2413,7 +2413,7 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
AddKafkaData(Set(topic), 30, 31, 32), // Add data when stream is stopped
StartStream(),
CheckAnswer(-21, -22, -11, -12, 2, 12, 23, 24, 30, 31, 32), // Should get the added data
AssertOnQuery("Add partitions") { query: StreamExecution =>
AssertOnQuery("Add partitions") { (query: StreamExecution) =>
if (addPartitions) setTopicPartitions(topic, 10, query)
true
},
Expand Down Expand Up @@ -2581,7 +2581,7 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
CheckAnswer(2, 3, 4, 5, 6, 7), // Should get the added data
AddKafkaData(Set(topic), 7, 8),
CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9),
AssertOnQuery("Add partitions") { query: StreamExecution =>
AssertOnQuery("Add partitions") { (query: StreamExecution) =>
if (addPartitions) setTopicPartitions(topic, 10, query)
true
},
Expand Down Expand Up @@ -2622,7 +2622,7 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
AddKafkaData(Set(topic), 7, 8),
StartStream(),
CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9),
AssertOnQuery("Add partitions") { query: StreamExecution =>
AssertOnQuery("Add partitions") { (query: StreamExecution) =>
if (addPartitions) setTopicPartitions(topic, 10, query)
true
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private[spark] class DirectKafkaInputDStream[K, V](

private[streaming] override def name: String = s"Kafka 0.10 direct stream [$id]"

protected[streaming] override val checkpointData =
protected[streaming] override val checkpointData: DirectKafkaInputDStreamCheckpointData =
new DirectKafkaInputDStreamCheckpointData


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,11 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi

private def findNonExistentStreamName(): String = {
var testStreamName: String = null
do {
while ({
Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
testStreamName = s"KinesisTestUtils-${math.abs(Random.nextLong())}"
} while (describeStream(testStreamName).nonEmpty)
describeStream(testStreamName).nonEmpty
}) ()
testStreamName
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ private[sql] class ProtobufSerializer(
.unzip

val numFields = catalystStruct.length
row: InternalRow =>
(row: InternalRow) =>
val result = DynamicMessage.newBuilder(descriptor)
var i = 0
while (i < numFields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.rdd.HadoopRDD

@DeveloperApi
class JavaHadoopRDD[K, V](rdd: HadoopRDD[K, V])
(implicit override val kClassTag: ClassTag[K], implicit override val vClassTag: ClassTag[V])
(implicit override val kClassTag: ClassTag[K], override val vClassTag: ClassTag[V])
extends JavaPairRDD[K, V](rdd) {

/** Maps over a partition, providing the InputSplit that was used as the base of the partition. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.rdd.NewHadoopRDD

@DeveloperApi
class JavaNewHadoopRDD[K, V](rdd: NewHadoopRDD[K, V])
(implicit override val kClassTag: ClassTag[K], implicit override val vClassTag: ClassTag[V])
(implicit override val kClassTag: ClassTag[K], override val vClassTag: ClassTag[V])
extends JavaPairRDD[K, V](rdd) {

/** Maps over a partition, providing the InputSplit that was used as the base of the partition. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
(implicit val kClassTag: ClassTag[K], implicit val vClassTag: ClassTag[V])
(implicit val kClassTag: ClassTag[K], val vClassTag: ClassTag[V])
extends AbstractJavaRDDLike[(K, V), JavaPairRDD[K, V]] {

override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.sys.process._

import org.json4s.Formats
import org.json4s._
import org.json4s.jackson.JsonMethods

import org.apache.spark.{SparkConf, SparkContext}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,11 +408,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
resourceProfileId: Int)

def main(args: Array[String]): Unit = {
val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.bindAddress, arguments.hostname, arguments.cores,
env, arguments.resourcesFileOpt, resourceProfile)
val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) => CoarseGrainedExecutorBackend = {
case (rpcEnv, arguments, env, resourceProfile) =>
new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.bindAddress, arguments.hostname, arguments.cores,
env, arguments.resourcesFileOpt, resourceProfile)
}
run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
System.exit(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec {
new BufferedOutputStream(os, bufferSize)
}

override private[spark] def compressedContinuousOutputStream(s: OutputStream) = {
override private[spark] def compressedContinuousOutputStream(s: OutputStream): OutputStream = {
// SPARK-29322: Set "closeFrameOnFlush" to 'true' to let continuous input stream not being
// stuck on reading open frame.
val os = new ZstdOutputStreamNoFinalizer(s, bufferPool)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class CoGroupedRDD[K: ClassTag](
}

override def getDependencies: Seq[Dependency[_]] = {
rdds.map { rdd: RDD[_] =>
rdds.map { (rdd: RDD[_]) =>
if (rdd.partitioner == Some(part)) {
logDebug("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
}

override val partitioner = Some(part)
override val partitioner: Some[Partitioner] = Some(part)

override def getPartitions: Array[Partition] = {
Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRDDPartition(i))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
array
}

override val partitioner = Some(part)
override val partitioner: Option[Partitioner] = Some(part)

override def compute(p: Partition, context: TaskContext): Iterator[(K, V)] = {
val partition = p.asInstanceOf[CoGroupPartition]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag]
}
}

private[spark] class ZippedPartitionsRDD3
[A: ClassTag, B: ClassTag, C: ClassTag, V: ClassTag](
private[spark] class ZippedPartitionsRDD3[A: ClassTag, B: ClassTag, C: ClassTag, V: ClassTag](
sc: SparkContext,
var f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V],
var rdd1: RDD[A],
Expand All @@ -123,8 +122,8 @@ private[spark] class ZippedPartitionsRDD3
}
}

private[spark] class ZippedPartitionsRDD4
[A: ClassTag, B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag](
private[spark] class ZippedPartitionsRDD4[
A: ClassTag, B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag](
sc: SparkContext,
var f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V],
var rdd1: RDD[A],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.util.control.NonFatal

import org.json4s.{DefaultFormats, Extraction, Formats, JValue}
import org.json4s.jackson.JsonMethods._
import org.json4s.jvalue2extractable

import org.apache.spark.SparkException
import org.apache.spark.annotation.Evolving
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.control.NonFatal

import org.json4s.{DefaultFormats, Formats}
import org.json4s.jackson.JsonMethods._
import org.json4s.jvalue2extractable

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.annotation.DeveloperApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,15 +573,16 @@ private[spark] class TaskSchedulerImpl(
var globalMinLocality: Option[TaskLocality] = None
for (currentMaxLocality <- taskSet.myLocalityLevels) {
var launchedTaskAtCurrentMaxLocality = false
do {
while ({
val (noDelayScheduleReject, minLocality) = resourceOfferSingleTaskSet(
taskSet, currentMaxLocality, shuffledOffers, availableCpus,
availableResources, tasks)
launchedTaskAtCurrentMaxLocality = minLocality.isDefined
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
noDelaySchedulingRejects &= noDelayScheduleReject
globalMinLocality = minTaskLocality(globalMinLocality, minLocality)
} while (launchedTaskAtCurrentMaxLocality)
launchedTaskAtCurrentMaxLocality
}) ()
}

if (!legacyLocalityWaitReset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager

private lazy val shuffleExecutorComponents = loadShuffleExecutorComponents(conf)

override val shuffleBlockResolver =
override val shuffleBlockResolver: IndexShuffleBlockResolver =
new IndexShuffleBlockResolver(conf, taskIdMapsForShuffle = taskIdMapsForShuffle)

/**
Expand Down
20 changes: 10 additions & 10 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ class ExecutorStageSummary private[spark](
val diskBytesSpilled : Long,
@deprecated("use isExcludedForStage instead", "3.1.0")
val isBlacklistedForStage: Boolean,
@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
@JsonSerialize(`using` = classOf[ExecutorMetricsJsonSerializer])
@JsonDeserialize(`using` = classOf[ExecutorMetricsJsonDeserializer])
val peakMemoryMetrics: Option[ExecutorMetrics],
val isExcludedForStage: Boolean)

Expand Down Expand Up @@ -125,8 +125,8 @@ class ExecutorSummary private[spark](
val memoryMetrics: Option[MemoryMetrics],
@deprecated("use excludedInStages instead", "3.1.0")
val blacklistedInStages: Set[Int],
@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
@JsonSerialize(`using` = classOf[ExecutorMetricsJsonSerializer])
@JsonDeserialize(`using` = classOf[ExecutorMetricsJsonDeserializer])
val peakMemoryMetrics: Option[ExecutorMetrics],
val attributes: Map[String, String],
val resources: Map[String, ResourceInformation],
Expand Down Expand Up @@ -165,9 +165,9 @@ private[spark] class ExecutorMetricsJsonSerializer
if (metrics.isEmpty) {
jsonGenerator.writeNull()
} else {
metrics.foreach { m: ExecutorMetrics =>
metrics.foreach { metrics =>
val metricsMap = ExecutorMetricType.metricToOffset.map { case (metric, _) =>
metric -> m.getMetricValue(metric)
metric -> metrics.getMetricValue(metric)
}
jsonGenerator.writeObject(metricsMap)
}
Expand Down Expand Up @@ -310,8 +310,8 @@ class StageData private[spark](
val speculationSummary: Option[SpeculationStageSummary],
val killedTasksSummary: Map[String, Int],
val resourceProfileId: Int,
@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
@JsonSerialize(`using` = classOf[ExecutorMetricsJsonSerializer])
@JsonDeserialize(`using` = classOf[ExecutorMetricsJsonDeserializer])
val peakExecutorMetrics: Option[ExecutorMetrics],
val taskMetricsDistributions: Option[TaskMetricDistributions],
val executorMetricsDistributions: Option[ExecutorMetricsDistributions],
Expand Down Expand Up @@ -448,11 +448,11 @@ class ExecutorMetricsDistributions private[spark](
val shuffleWriteRecords: IndexedSeq[Double],
val memoryBytesSpilled: IndexedSeq[Double],
val diskBytesSpilled: IndexedSeq[Double],
@JsonSerialize(using = classOf[ExecutorPeakMetricsDistributionsJsonSerializer])
@JsonSerialize(`using` = classOf[ExecutorPeakMetricsDistributionsJsonSerializer])
val peakMemoryMetrics: ExecutorPeakMetricsDistributions
)

@JsonSerialize(using = classOf[ExecutorPeakMetricsDistributionsJsonSerializer])
@JsonSerialize(`using` = classOf[ExecutorPeakMetricsDistributionsJsonSerializer])
class ExecutorPeakMetricsDistributions private[spark](
val quantiles: IndexedSeq[Double],
val executorMetrics: IndexedSeq[ExecutorMetrics]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ class BlockManagerMasterEndpoint(
private def removeBlockFromWorkers(blockId: BlockId): Unit = {
val locations = blockLocations.get(blockId)
if (locations != null) {
locations.foreach { blockManagerId: BlockManagerId =>
locations.foreach { blockManagerId =>
val blockManager = blockManagerInfo.get(blockManagerId)
blockManager.foreach { bm =>
// Remove the block from the BlockManager.
Expand All @@ -602,7 +602,7 @@ class BlockManagerMasterEndpoint(

// Return a map from the block manager id to max memory and remaining memory.
private def memoryStatus: Map[BlockManagerId, (Long, Long)] = {
blockManagerInfo.map { case(blockManagerId, info) =>
blockManagerInfo.map { case (blockManagerId, info) =>
(blockManagerId, (info.maxMem, info.remainingMem))
}.toMap
}
Expand Down
Loading