Skip to content

Commit 9235a36

Browse files
committed
Enable -Xsource:3-cross compiler flag
This not only eases potential future Scala 3 migration but also make the compiler stricter with features that have proven to be warts.
1 parent 38a1958 commit 9235a36

File tree

294 files changed

+1064
-973
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

294 files changed

+1064
-973
lines changed

common/utils/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ private[spark] class TypedConfigBuilder[T](
9494
import ConfigHelpers._
9595

9696
def this(parent: ConfigBuilder, converter: String => T) = {
97-
this(parent, converter, { v: T => v.toString })
97+
this(parent, converter, (v: T) => v.toString)
9898
}
9999

100100
/** Apply a transformation to the user-provided values of the config entry. */

connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ private[kafka010] class KafkaOffsetReaderAdmin(
336336
// admin has a much bigger chance to hit KAFKA-7703 like issues.
337337
var incorrectOffsets: Seq[(TopicPartition, Long, Long)] = Nil
338338
var attempt = 0
339-
do {
339+
while ({
340340
partitionOffsets = listOffsets(admin, listOffsetsParams)
341341
attempt += 1
342342

@@ -349,7 +349,8 @@ private[kafka010] class KafkaOffsetReaderAdmin(
349349
Thread.sleep(offsetFetchAttemptIntervalMs)
350350
}
351351
}
352-
} while (incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts)
352+
incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts
353+
}) ()
353354

354355
logDebug(s"Got latest offsets for partitions: $partitionOffsets")
355356
partitionOffsets

connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ private[kafka010] class KafkaOffsetReaderConsumer(
385385
// - Avoid calling `consumer.poll(0)` which may cause KAFKA-7703.
386386
var incorrectOffsets: Seq[(TopicPartition, Long, Long)] = Nil
387387
var attempt = 0
388-
do {
388+
while ({
389389
consumer.seekToEnd(partitions)
390390
partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
391391
attempt += 1
@@ -399,7 +399,8 @@ private[kafka010] class KafkaOffsetReaderConsumer(
399399
Thread.sleep(offsetFetchAttemptIntervalMs)
400400
}
401401
}
402-
} while (incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts)
402+
incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts
403+
}) ()
403404

404405
logDebug(s"Got latest offsets for partition : $partitionOffsets")
405406
partitionOffsets

connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ private[consumer] class FetchedDataPool(
151151
private def removeIdleFetchedData(): Unit = synchronized {
152152
val now = clock.getTimeMillis()
153153
val maxAllowedReleasedTimestamp = now - minEvictableIdleTimeMillis
154-
cache.values.foreach { p: CachedFetchedDataList =>
154+
cache.values.foreach { (p: CachedFetchedDataList) =>
155155
val expired = p.filter { q =>
156156
!q.inUse && q.lastReleasedTimestamp < maxAllowedReleasedTimestamp
157157
}

connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2362,7 +2362,7 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
23622362
AddKafkaData(Set(topic), 30, 31, 32), // Add data when stream is stopped
23632363
StartStream(),
23642364
CheckAnswer(-21, -22, -11, -12, 2, 12, 20, 21, 22, 30, 31, 32), // Should get the added data
2365-
AssertOnQuery("Add partitions") { query: StreamExecution =>
2365+
AssertOnQuery("Add partitions") { (query: StreamExecution) =>
23662366
if (addPartitions) setTopicPartitions(topic, 10, query)
23672367
true
23682368
},
@@ -2413,7 +2413,7 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
24132413
AddKafkaData(Set(topic), 30, 31, 32), // Add data when stream is stopped
24142414
StartStream(),
24152415
CheckAnswer(-21, -22, -11, -12, 2, 12, 23, 24, 30, 31, 32), // Should get the added data
2416-
AssertOnQuery("Add partitions") { query: StreamExecution =>
2416+
AssertOnQuery("Add partitions") { (query: StreamExecution) =>
24172417
if (addPartitions) setTopicPartitions(topic, 10, query)
24182418
true
24192419
},
@@ -2581,7 +2581,7 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
25812581
CheckAnswer(2, 3, 4, 5, 6, 7), // Should get the added data
25822582
AddKafkaData(Set(topic), 7, 8),
25832583
CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9),
2584-
AssertOnQuery("Add partitions") { query: StreamExecution =>
2584+
AssertOnQuery("Add partitions") { (query: StreamExecution) =>
25852585
if (addPartitions) setTopicPartitions(topic, 10, query)
25862586
true
25872587
},
@@ -2622,7 +2622,7 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
26222622
AddKafkaData(Set(topic), 7, 8),
26232623
StartStream(),
26242624
CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9),
2625-
AssertOnQuery("Add partitions") { query: StreamExecution =>
2625+
AssertOnQuery("Add partitions") { (query: StreamExecution) =>
26262626
if (addPartitions) setTopicPartitions(topic, 10, query)
26272627
true
26282628
},

connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
112112

113113
private[streaming] override def name: String = s"Kafka 0.10 direct stream [$id]"
114114

115-
protected[streaming] override val checkpointData =
115+
protected[streaming] override val checkpointData: DirectKafkaInputDStreamCheckpointData =
116116
new DirectKafkaInputDStreamCheckpointData
117117

118118

connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,10 +176,11 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi
176176

177177
private def findNonExistentStreamName(): String = {
178178
var testStreamName: String = null
179-
do {
179+
while ({
180180
Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
181181
testStreamName = s"KinesisTestUtils-${math.abs(Random.nextLong())}"
182-
} while (describeStream(testStreamName).nonEmpty)
182+
describeStream(testStreamName).nonEmpty
183+
}) ()
183184
testStreamName
184185
}
185186

connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ private[sql] class ProtobufSerializer(
322322
.unzip
323323

324324
val numFields = catalystStruct.length
325-
row: InternalRow =>
325+
(row: InternalRow) =>
326326
val result = DynamicMessage.newBuilder(descriptor)
327327
var i = 0
328328
while (i < numFields) {

core/src/main/scala/org/apache/spark/api/java/JavaHadoopRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.rdd.HadoopRDD
2929

3030
@DeveloperApi
3131
class JavaHadoopRDD[K, V](rdd: HadoopRDD[K, V])
32-
(implicit override val kClassTag: ClassTag[K], implicit override val vClassTag: ClassTag[V])
32+
(implicit override val kClassTag: ClassTag[K], override val vClassTag: ClassTag[V])
3333
extends JavaPairRDD[K, V](rdd) {
3434

3535
/** Maps over a partition, providing the InputSplit that was used as the base of the partition. */

core/src/main/scala/org/apache/spark/api/java/JavaNewHadoopRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.rdd.NewHadoopRDD
2929

3030
@DeveloperApi
3131
class JavaNewHadoopRDD[K, V](rdd: NewHadoopRDD[K, V])
32-
(implicit override val kClassTag: ClassTag[K], implicit override val vClassTag: ClassTag[V])
32+
(implicit override val kClassTag: ClassTag[K], override val vClassTag: ClassTag[V])
3333
extends JavaPairRDD[K, V](rdd) {
3434

3535
/** Maps over a partition, providing the InputSplit that was used as the base of the partition. */

0 commit comments

Comments
 (0)