@@ -28,7 +28,7 @@ import io.github.mandar2812.dynaml.models.sgp.ESGPModel
2828import io .github .mandar2812 .dynaml .optimization ._
2929import io .github .mandar2812 .dynaml .pipes ._
3030import io .github .mandar2812 .dynaml .probability .ContinuousDistrRV
31- import io .github .mandar2812 .dynaml .utils .{ GaussianScaler , MVGaussianScaler , MeanScaler , MinMaxScaler }
31+ import io .github .mandar2812 .dynaml .utils ._
3232import io .github .mandar2812 .dynaml .wavelets .{GroupedHaarWaveletFilter , HaarWaveletFilter , InvGroupedHaarWaveletFilter , InverseHaarWaveletFilter }
3333import org .apache .log4j .Logger
3434import org .apache .spark .rdd .RDD
@@ -436,6 +436,53 @@ object DynaMLPipe {
436436 (result, (featuresScaler, targetsScaler))
437437 })
438438
439+ /**
440+ * Returns a pipe which performs PCA on data features and gaussian scaling on data targets
441+ * @param standardize Set to true if one wants the standardized data and false if one
442+ * does wants the original data with the [[MVGaussianScaler ]] instances.
443+ * */
444+ def calculatePCAScales (standardize : Boolean = true ): DataPipe [
445+ Stream [(DenseVector [Double ], DenseVector [Double ])],
446+ (Stream [(DenseVector [Double ], DenseVector [Double ])], (PCAScaler , MVGaussianScaler ))] =
447+ DataPipe ((data : Stream [(DenseVector [Double ], DenseVector [Double ])]) => {
448+
449+ val (num_features, num_targets) = (data.head._1.length, data.head._2.length)
450+
451+ val (m, sigma) = utils.getStatsMult(data.map(tup =>
452+ DenseVector (tup._1.toArray ++ tup._2.toArray)).toList)
453+
454+ val featuresScaler = PCAScaler (
455+ m(0 until num_features),
456+ sigma(0 until num_features, 0 until num_features))
457+
458+ val targetsScaler = MVGaussianScaler (
459+ m(num_features until num_features + num_targets),
460+ sigma(num_features until num_features + num_targets, num_features until num_features + num_targets))
461+
462+ val result = if (standardize) (featuresScaler * targetsScaler)(data) else data
463+
464+ (result, (featuresScaler, targetsScaler))
465+ })
466+
467+ /**
468+ * Returns a pipe which performs PCA on data features and gaussian scaling on data targets
469+ * @param standardize Set to true if one wants the standardized data and false if one
470+ * does wants the original data with the [[MVGaussianScaler ]] instances.
471+ * */
472+ def calculatePCAScalesFeatures (standardize : Boolean = true ): DataPipe [
473+ Stream [DenseVector [Double ]],
474+ (Stream [DenseVector [Double ]], PCAScaler )] =
475+ DataPipe ((data : Stream [DenseVector [Double ]]) => {
476+
477+ val (m, sigma) = utils.getStatsMult(data.toList)
478+
479+ val featuresScaler = PCAScaler (m, sigma)
480+
481+ val result = if (standardize) featuresScaler(data) else data
482+
483+ (result, featuresScaler)
484+ })
485+
439486 /**
440487 * Returns a pipe which takes a data set and calculates the minimum and maximum of each dimension.
441488 * @param standardize Set to true if one wants the standardized data and false if one
@@ -520,6 +567,16 @@ object DynaMLPipe {
520567 (calculateMVGaussianScales()* identityPipe[Stream [(DenseVector [Double ], DenseVector [Double ])]]) >
521568 scaleTestPipe[DenseVector [Double ], MVGaussianScaler ]
522569
570+ /**
571+ * Transform a data set by performing PCA on its patterns.
572+ * */
573+ val pcaFeatureScaling = calculatePCAScalesFeatures()
574+
575+ /**
576+ * Transform a data set consisting of features and targets.
577+ * Perform PCA scaling of features and gaussian scaling of targets.
578+ * */
579+ val pcaScaling = calculatePCAScales()
523580
524581 /**
525582 * Scale a data set which is stored as a [[Stream ]],
0 commit comments