Skip to content

[SPARK-21259] More rules for scalastyle #18471

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/Accumulable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ class Accumulable[R, T] private (
}

private[spark] def this(initialValue: R, param: AccumulableParam[R, T], name: Option[String]) = {
// scalastyle:off
this(initialValue, param, name, false /* countFailedValues */)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: countFailedValues = false is better, this style is only useful for java

// scalastyle:on
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we could enable and disable via scalastyle:on customid rather than switching off whole style rules here IIRC.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, nvm. I just read your commit log ...

}

def this(initialValue: R, param: AccumulableParam[R, T]) = this(initialValue, param, None)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ trait JobSubmitter {
* jobs.
*/
@DeveloperApi
class ComplexFutureAction[T](run : JobSubmitter => Future[T])
class ComplexFutureAction[T](run: JobSubmitter => Future[T])
extends FutureAction[T] { self =>

@volatile private var _cancelled = false
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class HashPartitioner(partitions: Int) extends Partitioner {
* as the `partitions` parameter, in the case where the number of sampled records is less than
* the value of `partitions`.
*/
class RangePartitioner[K : Ordering : ClassTag, V](
class RangePartitioner[K: Ordering: ClassTag, V](
partitions: Int,
rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
Expand Down Expand Up @@ -253,7 +253,7 @@ private[spark] object RangePartitioner {
* @param sampleSizePerPartition max sample size per partition
* @return (total number of items, an array of (partitionId, number of items, sample))
*/
def sketch[K : ClassTag](
def sketch[K: ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
val shift = rdd.id
Expand All @@ -276,7 +276,7 @@ private[spark] object RangePartitioner {
* @param partitions number of partitions
* @return selected bounds
*/
def determineBounds[K : Ordering : ClassTag](
def determineBounds[K: Ordering: ClassTag](
candidates: ArrayBuffer[(K, Float)],
partitions: Int): Array[K] = {
val ordering = implicitly[Ordering[K]]
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ private[spark] class SecurityManager(
stringToSet(sparkConf.get("spark.admin.acls", ""))

// admin group acls should be set before view or modify group acls
private var adminAclsGroups : Set[String] =
private var adminAclsGroups: Set[String] =
stringToSet(sparkConf.get("spark.admin.acls.groups", ""))

private var viewAcls: Set[String] = _
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ private[spark] object SparkConf extends Logging {
DeprecatedConfig("spark.executor.port", "2.0.0", "Not used any more")
)

Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
Map(configs.map { cfg => (cfg.key -> cfg) }: _*)
}

/**
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2869,7 +2869,7 @@ private[spark] class WritableFactory[T](

object WritableFactory {

private[spark] def simpleWritableFactory[T: ClassTag, W <: Writable : ClassTag](convert: T => W)
private[spark] def simpleWritableFactory[T: ClassTag, W <: Writable: ClassTag](convert: T => W)
: WritableFactory[T] = {
val writableClass = implicitly[ClassTag[W]].runtimeClass.asInstanceOf[Class[W]]
new WritableFactory[T](_ => writableClass, convert)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private[spark] class ThrowableSerializationWrapper(var exception: Throwable) ext
try {
exception = in.readObject().asInstanceOf[Throwable]
} catch {
case e : Exception => log.warn("Task exception could not be deserialized", e)
case e: Exception => log.warn("Task exception could not be deserialized", e)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[spark] object JavaUtils {

override def iterator: ju.Iterator[ju.Map.Entry[A, B]] = new ju.Iterator[ju.Map.Entry[A, B]] {
val ui = underlying.iterator
var prev : Option[A] = None
var prev: Option[A] = None

def hasNext: Boolean = ui.hasNext

Expand All @@ -65,7 +65,7 @@ private[spark] object JavaUtils {
import scala.util.hashing.byteswap32
override def getKey: A = k
override def getValue: B = v
override def setValue(v1 : B): B = self.put(k, v1)
override def setValue(v1: B): B = self.put(k, v1)
override def hashCode: Int = byteswap32(k.hashCode) + (byteswap32(v.hashCode) << 16)
override def equals(other: Any): Boolean = other match {
case e: ju.Map.Entry[_, _] => k == e.getKey && v == e.getValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ private class PairwiseRDD(prev: RDD[Array[Byte]]) extends RDD[(Long, Array[Byte]
case Seq(a, b) => (Utils.deserializeLongValue(a), b)
case x => throw new SparkException("PairwiseRDD: unexpected value: " + x)
}
val asJavaPairRDD : JavaPairRDD[Long, Array[Byte]] = JavaPairRDD.fromRDD(this)
val asJavaPairRDD: JavaPairRDD[Long, Array[Byte]] = JavaPairRDD.fromRDD(this)
}

private object SpecialLengths {
Expand Down Expand Up @@ -867,7 +867,7 @@ private[spark] object PythonRDD extends Logging {

private
class BytesToString extends org.apache.spark.api.java.function.Function[Array[Byte], String] {
override def call(arr: Array[Byte]) : String = new String(arr, StandardCharsets.UTF_8)
override def call(arr: Array[Byte]): String = new String(arr, StandardCharsets.UTF_8)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private[python] class DoubleArrayToWritableConverter extends Converter[Any, Writ

private[python] class WritableToDoubleArrayConverter extends Converter[Any, Array[Double]] {
override def convert(obj: Any): Array[Double] = obj match {
case daw : DoubleArrayWritable => daw.get().map(_.asInstanceOf[DoubleWritable].get())
case daw: DoubleArrayWritable => daw.get().map(_.asInstanceOf[DoubleWritable].get())
case other => throw new SparkException(s"Data of type $other is not supported")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private[r] class RBackendHandler(server: RBackend)
throw new Exception(s"No matched method found for $cls.$methodName")
}

val ret = selectedMethods(index.get).invoke(obj, args : _*)
val ret = selectedMethods(index.get).invoke(obj, args: _*)

// Write status bit
writeInt(dos, 0)
Expand All @@ -185,7 +185,7 @@ private[r] class RBackendHandler(server: RBackend)
throw new Exception(s"No matched constructor found for $cls")
}

val obj = ctors(index.get).newInstance(args : _*)
val obj = ctors(index.get).newInstance(args: _*)

writeInt(dos, 0)
writeObject(dos, obj.asInstanceOf[AnyRef], server.jvmObjectTracker)
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/api/r/RRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private class PairwiseRRDD[T: ClassTag](
parent, numPartitions, hashFunc, deserializer,
SerializationFormats.BYTE, packageNames,
broadcastVars.map(x => x.asInstanceOf[Broadcast[Object]])) {
lazy val asJavaPairRDD : JavaPairRDD[Int, Array[Byte]] = JavaPairRDD.fromRDD(this)
lazy val asJavaPairRDD: JavaPairRDD[Int, Array[Byte]] = JavaPairRDD.fromRDD(this)
}

/**
Expand All @@ -83,7 +83,7 @@ private class RRDD[T: ClassTag](
extends BaseRRDD[T, Array[Byte]](
parent, -1, func, deserializer, serializer, packageNames,
broadcastVars.map(x => x.asInstanceOf[Broadcast[Object]])) {
lazy val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
lazy val asJavaRDD: JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
}

/**
Expand All @@ -98,7 +98,7 @@ private class StringRRDD[T: ClassTag](
extends BaseRRDD[T, String](
parent, -1, func, deserializer, SerializationFormats.STRING, packageNames,
broadcastVars.map(x => x.asInstanceOf[Broadcast[Object]])) {
lazy val asJavaRDD : JavaRDD[String] = JavaRDD.fromRDD(this)
lazy val asJavaRDD: JavaRDD[String] = JavaRDD.fromRDD(this)
}

private[r] object RRDD {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ private[spark] class BroadcastManager(

private val nextBroadcastId = new AtomicLong(0)

def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
def newBroadcast[T: ClassTag](_value: T, isLocal: Boolean): Broadcast[T] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure of this naming change. I mean the change does not look particulaily better to me. I believe we have a open PR - databricks/scala-style-guide#52 which appearntly stays against some cases here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not about adding _ to the name of private val, but kind of avoiding name conflict if 2 variables have similar meaning. At least this is fine to me, I also do this a lot...

Copy link
Member

@HyukjinKwon HyukjinKwon Jun 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine if we prefer a specific way. My only worry is we happen to fix it back again in the future but no one stays against because they don't know which way is preferred...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, this can be an example other contributors like me can refer in the future BTW.

broadcastFactory.newBroadcast[T](_value, isLocal, nextBroadcastId.getAndIncrement())
}

def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ private[spark] class TorrentBroadcastFactory extends BroadcastFactory {

override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) { }

override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = {
new TorrentBroadcast[T](value_, id)
override def newBroadcast[T: ClassTag](_value: T, isLocal: Boolean, id: Long): Broadcast[T] = {
new TorrentBroadcast[T](_value, id)
}

override def stop() { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana

/** Clean up all shuffle files associated with an application that has exited. */
def applicationRemoved(appId: String): Unit = {
// scalastyle:off
blockHandler.applicationRemoved(appId, true /* cleanupLocalDirs */)
// scalastyle:on
}

def stop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ private object SparkDocker {
new TestWorkerInfo(ip, id, outFile)
}

private def startNode(dockerCmd: ProcessBuilder) : (String, DockerId, File) = {
private def startNode(dockerCmd: ProcessBuilder): (String, DockerId, File) = {
val ipPromise = Promise[String]()
val outFile = File.createTempFile("fault-tolerance-test", "", Utils.createTempDir())
val outStream: FileWriter = new FileWriter(outFile)
Expand Down Expand Up @@ -440,7 +440,7 @@ private object Docker extends Logging {
cmd
}

def kill(dockerId: DockerId) : Unit = {
def kill(dockerId: DockerId): Unit = {
"docker kill %s".format(dockerId.id).!
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ private[history] object ApplicationCacheCheckFilterRelay extends Logging {
* @param cache new cache
*/
def setApplicationCache(cache: ApplicationCache): Unit = {
applicationCache.foreach( c => logWarning(s"Overwriting application cache $c"))
applicationCache.foreach(c => logWarning(s"Overwriting application cache $c"))
applicationCache = Some(cache)
}

Expand Down Expand Up @@ -650,13 +650,13 @@ private[history] object ApplicationCacheCheckFilterRelay extends Logging {
def registerFilter(
ui: SparkUI,
appId: String,
attemptId: Option[String] ): Unit = {
attemptId: Option[String]): Unit = {
require(ui != null)
val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.REQUEST)
val holder = new FilterHolder()
holder.setClassName(FILTER_NAME)
holder.setInitParameter(APP_ID, appId)
attemptId.foreach( id => holder.setInitParameter(ATTEMPT_ID, id))
attemptId.foreach(id => holder.setInitParameter(ATTEMPT_ID, id))
require(ui.getHandlers != null, "null handlers")
ui.getHandlers.foreach { handler =>
handler.addFilter(holder, "/*", enumDispatcher)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private[deploy] class Worker(
*/
private var masterAddressToConnect: Option[RpcAddress] = None
private var activeMasterUrl: String = ""
private[worker] var activeMasterWebUiUrl : String = ""
private[worker] var activeMasterWebUiUrl: String = ""
private var workerWebUiUrl: String = ""
private val workerUri = RpcEndpointAddress(rpcEnv.address, endpointName).toString
private var registered = false
Expand Down Expand Up @@ -178,7 +178,7 @@ private[deploy] class Worker(
// This sporadically fails - not sure why ... !workDir.exists() && !workDir.mkdirs()
// So attempting to create and then check if directory was created or not.
workDir.mkdirs()
if ( !workDir.exists() || !workDir.isDirectory) {
if (!workDir.exists() || !workDir.isDirectory) {
logError("Failed to create work directory " + workDir)
System.exit(1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ private[spark] class Executor(
* tasks instead of taking the JVM down.
* @param interruptThread whether to interrupt the task thread
*/
def killAllTasks(interruptThread: Boolean, reason: String) : Unit = {
def killAllTasks(interruptThread: Boolean, reason: String): Unit = {
runningTasks.keys().asScala.foreach(t =>
killTask(t, interruptThread = interruptThread, reason = reason))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.metrics.source.Source
private[spark]
class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends Source {

private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
private def fileStats(scheme: String): Option[FileSystem.Statistics] =
FileSystem.getAllStatistics.asScala.find(s => s.getScheme.equals(scheme))

private def registerFileSystemStat[T](
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/internal/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ trait Logging {

// Make the log field transient so that objects with Logging can
// be serialized and used on another machine
@transient private var log_ : Logger = null
@transient private var _log: Logger = null

// Method to get the logger name for this object
protected def logName = {
Expand All @@ -42,11 +42,11 @@ trait Logging {

// Method to get or create the logger for this object
protected def log: Logger = {
if (log_ == null) {
if (_log == null) {
initializeLogIfNecessary(false)
log_ = LoggerFactory.getLogger(logName)
_log = LoggerFactory.getLogger(logName)
}
log_
_log
}

// Log methods that take only a String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private[spark] abstract class LauncherBackend {
/**
* Callback for when the launcher handle disconnects from this backend.
*/
protected def onDisconnected() : Unit = { }
protected def onDisconnected(): Unit = { }

private def fireStopRequest(): Unit = {
val thread = LauncherBackend.threadFactory.newThread(new Runnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.util.collection.OpenHashMap
/**
* An ApproximateEvaluator for counts by key. Returns a map of key to confidence interval.
*/
private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, confidence: Double)
private[spark] class GroupedCountEvaluator[T: ClassTag](totalOutputs: Int, confidence: Double)
extends ApproximateEvaluator[OpenHashMap[T, Long], Map[T, BoundedDouble]] {

private var outputsMerged = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) {
/**
* Transform this PartialResult into a PartialResult of type T.
*/
def map[T](f: R => T) : PartialResult[T] = {
def map[T](f: R => T): PartialResult[T] = {
new PartialResult[T](f(initialVal), isFinal) {
override def getFinalValue() : T = synchronized {
override def getFinalValue(): T = synchronized {
f(PartialResult.this.getFinalValue())
}
override def onComplete(handler: T => Unit): PartialResult[T] = synchronized {
Expand All @@ -90,7 +90,7 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) {
PartialResult.this.onFail(handler)
}
}
override def toString : String = synchronized {
override def toString: String = synchronized {
PartialResult.this.getFinalValueInternal() match {
case Some(value) => "(final: " + f(value) + ")"
case None => "(partial: " + initialValue + ")"
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ class CartesianPartition(
private[spark]
class CartesianRDD[T: ClassTag, U: ClassTag](
sc: SparkContext,
var rdd1 : RDD[T],
var rdd2 : RDD[U])
var rdd1: RDD[T],
var rdd2: RDD[U])
extends RDD[(T, U)](sc, Nil)
with Serializable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
}
}

def getPartitions: Array[PartitionGroup] = groupArr.filter( pg => pg.numPartitions > 0).toArray
def getPartitions: Array[PartitionGroup] = groupArr.filter(pg => pg.numPartitions > 0).toArray

/**
* Runs the packing algorithm and returns an array of PartitionGroups that if possible are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
}.reduce { (maxmin1, maxmin2) =>
(maxmin1._1.max(maxmin2._1), maxmin1._2.min(maxmin2._2))
}
if (min.isNaN || max.isNaN || max.isInfinity || min.isInfinity ) {
if (min.isNaN || max.isNaN || max.isInfinity || min.isInfinity) {
throw new UnsupportedOperationException(
"Histogram on either an empty RDD or RDD containing +/-infinity or NaN")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private[spark] class NewHadoopPartition(
*/
@DeveloperApi
class NewHadoopRDD[K, V](
sc : SparkContext,
sc: SparkContext,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ import org.apache.spark.internal.Logging
* rdd.sortByKey()
* }}}
*/
class OrderedRDDFunctions[K : Ordering : ClassTag,
class OrderedRDDFunctions[K: Ordering: ClassTag,
V: ClassTag,
P <: Product2[K, V] : ClassTag] @DeveloperApi() (
P <: Product2[K, V]: ClassTag] @DeveloperApi() (
self: RDD[P])
extends Logging with Serializable {
private val ordering = implicitly[Ordering[K]]
Expand Down
Loading