Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update scalafmt-core to 3.8.5 #862

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Scala Steward: Reformat with scalafmt 3.8.5
71641698f9ffe2e123410ff46641f7d38b6b986d
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = 3.8.1
version = 3.8.5
runner.dialect = scala213

newlines.beforeMultilineDef = keep
Expand Down
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,10 @@ lazy val datasetSettings =
)
},
coverageExcludedPackages := "org.apache.spark.sql.reflection",
libraryDependencies += "com.globalmentor" % "hadoop-bare-naked-local-fs" % nakedFSVersion % Test exclude ("org.apache.hadoop", "hadoop-commons")
libraryDependencies += "com.globalmentor" % "hadoop-bare-naked-local-fs" % nakedFSVersion % Test exclude (
"org.apache.hadoop",
"hadoop-commons"
)
)

lazy val refinedSettings =
Expand Down
17 changes: 12 additions & 5 deletions cats/src/main/scala/frameless/cats/FramelessSyntax.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,25 @@ import _root_.cats.mtl.Ask
import org.apache.spark.sql.SparkSession

trait FramelessSyntax extends frameless.FramelessSyntax {
implicit class SparkJobOps[F[_], A](fa: F[A])(implicit S: Sync[F], A: Ask[F, SparkSession]) {

implicit class SparkJobOps[F[_], A](
fa: F[A]
)(implicit
S: Sync[F],
A: Ask[F, SparkSession]) {
import S._, A._

def withLocalProperty(key: String, value: String): F[A] =
for {
session <- ask
_ <- delay(session.sparkContext.setLocalProperty(key, value))
a <- fa
_ <- delay(session.sparkContext.setLocalProperty(key, value))
a <- fa
} yield a

def withGroupId(groupId: String): F[A] = withLocalProperty("spark.jobGroup.id", groupId)
def withGroupId(groupId: String): F[A] =
withLocalProperty("spark.jobGroup.id", groupId)

def withDescription(description: String): F[A] = withLocalProperty("spark.job.description", description)
def withDescription(description: String): F[A] =
withLocalProperty("spark.job.description", description)
}
}
12 changes: 10 additions & 2 deletions cats/src/main/scala/frameless/cats/SparkDelayInstances.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,15 @@ import _root_.cats.effect.Sync
import org.apache.spark.sql.SparkSession

trait SparkDelayInstances {
implicit def framelessCatsSparkDelayForSync[F[_]](implicit S: Sync[F]): SparkDelay[F] = new SparkDelay[F] {
def delay[A](a: => A)(implicit spark: SparkSession): F[A] = S.delay(a)

implicit def framelessCatsSparkDelayForSync[F[_]](
implicit
S: Sync[F]
): SparkDelay[F] = new SparkDelay[F] {
def delay[A](
a: => A
)(implicit
spark: SparkSession
): F[A] = S.delay(a)
}
}
1 change: 1 addition & 0 deletions cats/src/main/scala/frameless/cats/SparkTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import _root_.cats.data.Kleisli
import org.apache.spark.SparkContext

object SparkTask {

def apply[A](f: SparkContext => A): SparkTask[A] =
Kleisli[Id, SparkContext, A](f)

Expand Down
74 changes: 59 additions & 15 deletions cats/src/main/scala/frameless/cats/implicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,73 +2,117 @@ package frameless
package cats

import _root_.cats._
import _root_.cats.kernel.{CommutativeMonoid, CommutativeSemigroup}
import _root_.cats.kernel.{ CommutativeMonoid, CommutativeSemigroup }
import _root_.cats.syntax.all._
import alleycats.Empty

import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD

object implicits extends FramelessSyntax with SparkDelayInstances {

implicit class rddOps[A: ClassTag](lhs: RDD[A]) {
def csum(implicit m: CommutativeMonoid[A]): A =

def csum(
implicit
m: CommutativeMonoid[A]
): A =
lhs.fold(m.empty)(_ |+| _)
def csumOption(implicit m: CommutativeSemigroup[A]): Option[A] =

def csumOption(
implicit
m: CommutativeSemigroup[A]
): Option[A] =
lhs.aggregate[Option[A]](None)(
(acc, a) => Some(acc.fold(a)(_ |+| a)),
(l, r) => l.fold(r)(x => r.map(_ |+| x) orElse Some(x))
)

def cmin(implicit o: Order[A], e: Empty[A]): A = {
def cmin(implicit
o: Order[A],
e: Empty[A]
): A = {
if (lhs.isEmpty()) e.empty
else lhs.reduce(_ min _)
}
def cminOption(implicit o: Order[A]): Option[A] =

def cminOption(
implicit
o: Order[A]
): Option[A] =
csumOption(new CommutativeSemigroup[A] {
def combine(l: A, r: A) = l min r
})

def cmax(implicit o: Order[A], e: Empty[A]): A = {
def cmax(implicit
o: Order[A],
e: Empty[A]
): A = {
if (lhs.isEmpty()) e.empty
else lhs.reduce(_ max _)
}
def cmaxOption(implicit o: Order[A]): Option[A] =

def cmaxOption(
implicit
o: Order[A]
): Option[A] =
csumOption(new CommutativeSemigroup[A] {
def combine(l: A, r: A) = l max r
})
}

implicit class pairRddOps[K: ClassTag, V: ClassTag](lhs: RDD[(K, V)]) {
def csumByKey(implicit m: CommutativeSemigroup[V]): RDD[(K, V)] = lhs.reduceByKey(_ |+| _)
def cminByKey(implicit o: Order[V]): RDD[(K, V)] = lhs.reduceByKey(_ min _)
def cmaxByKey(implicit o: Order[V]): RDD[(K, V)] = lhs.reduceByKey(_ max _)

def csumByKey(
implicit
m: CommutativeSemigroup[V]
): RDD[(K, V)] = lhs.reduceByKey(_ |+| _)

def cminByKey(
implicit
o: Order[V]
): RDD[(K, V)] = lhs.reduceByKey(_ min _)

def cmaxByKey(
implicit
o: Order[V]
): RDD[(K, V)] = lhs.reduceByKey(_ max _)
}
}

object union {

implicit def unionSemigroup[A]: Semigroup[RDD[A]] =
new Semigroup[RDD[A]] {
def combine(lhs: RDD[A], rhs: RDD[A]): RDD[A] = lhs union rhs
}
}

object inner {
implicit def pairwiseInnerSemigroup[K: ClassTag, V: ClassTag: Semigroup]: Semigroup[RDD[(K, V)]] =

implicit def pairwiseInnerSemigroup[
K: ClassTag,
V: ClassTag: Semigroup
]: Semigroup[RDD[(K, V)]] =
new Semigroup[RDD[(K, V)]] {
def combine(lhs: RDD[(K, V)], rhs: RDD[(K, V)]): RDD[(K, V)] =
lhs.join(rhs).mapValues { case (x, y) => x |+| y }
}
}

object outer {
implicit def pairwiseOuterSemigroup[K: ClassTag, V: ClassTag](implicit m: Monoid[V]): Semigroup[RDD[(K, V)]] =

implicit def pairwiseOuterSemigroup[K: ClassTag, V: ClassTag](
implicit
m: Monoid[V]
): Semigroup[RDD[(K, V)]] =
new Semigroup[RDD[(K, V)]] {
def combine(lhs: RDD[(K, V)], rhs: RDD[(K, V)]): RDD[(K, V)] =
lhs.fullOuterJoin(rhs).mapValues {
case (Some(x), Some(y)) => x |+| y
case (None, Some(y)) => y
case (Some(x), None) => x
case (None, None) => m.empty
case (None, Some(y)) => y
case (Some(x), None) => x
case (None, None) => m.empty
}
}
}
24 changes: 16 additions & 8 deletions cats/src/test/scala/frameless/cats/FramelessSyntaxTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ import _root_.cats.effect.IO
import _root_.cats.effect.unsafe.implicits.global
import org.apache.spark.sql.SparkSession
import org.scalatest.matchers.should.Matchers
import org.scalacheck.{Test => PTest}
import org.scalacheck.{ Test => PTest }
import org.scalacheck.Prop, Prop._
import org.scalacheck.effect.PropF, PropF._

class FramelessSyntaxTests extends TypedDatasetSuite with Matchers {
override val sparkDelay = null

def prop[A, B](data: Vector[X2[A, B]])(
implicit ev: TypedEncoder[X2[A, B]]
): Prop = {
def prop[A, B](
data: Vector[X2[A, B]]
)(implicit
ev: TypedEncoder[X2[A, B]]
): Prop = {
import implicits._

val dataset = TypedDataset.create(data).dataset
Expand All @@ -24,7 +26,13 @@ class FramelessSyntaxTests extends TypedDatasetSuite with Matchers {
val typedDataset = dataset.typed
val typedDatasetFromDataFrame = dataframe.unsafeTyped[X2[A, B]]

typedDataset.collect[IO]().unsafeRunSync().toVector ?= typedDatasetFromDataFrame.collect[IO]().unsafeRunSync().toVector
typedDataset
.collect[IO]()
.unsafeRunSync()
.toVector ?= typedDatasetFromDataFrame
.collect[IO]()
.unsafeRunSync()
.toVector
}

test("dataset typed - toTyped") {
Expand All @@ -37,8 +45,7 @@ class FramelessSyntaxTests extends TypedDatasetSuite with Matchers {

forAllF { (k: String, v: String) =>
val scopedKey = "frameless.tests." + k
1
.pure[ReaderT[IO, SparkSession, *]]
1.pure[ReaderT[IO, SparkSession, *]]
.withLocalProperty(scopedKey, v)
.withGroupId(v)
.withDescription(v)
Expand All @@ -47,7 +54,8 @@ class FramelessSyntaxTests extends TypedDatasetSuite with Matchers {
sc.getLocalProperty(scopedKey) shouldBe v
sc.getLocalProperty("spark.jobGroup.id") shouldBe v
sc.getLocalProperty("spark.job.description") shouldBe v
}.void
}
.void
}.check().unsafeRunSync().status shouldBe PTest.Passed
}
}
65 changes: 51 additions & 14 deletions cats/src/test/scala/frameless/cats/test.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import _root_.cats.syntax.all._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext => SC}
import org.apache.spark.{ SparkConf, SparkContext => SC }

import org.scalatest.compatible.Assertion
import org.scalactic.anyvals.PosInt
Expand All @@ -21,24 +21,39 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.propspec.AnyPropSpec

trait SparkTests {
val appID: String = new java.util.Date().toString + math.floor(math.random() * 10E4).toLong.toString

val appID: String = new java.util.Date().toString + math
.floor(math.random() * 10e4)
.toLong
.toString

val conf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("test")
.set("spark.ui.enabled", "false")
.set("spark.app.id", appID)

implicit def session: SparkSession = SparkSession.builder().config(conf).getOrCreate()
implicit def session: SparkSession =
SparkSession.builder().config(conf).getOrCreate()
implicit def sc: SparkContext = session.sparkContext

implicit class seqToRdd[A: ClassTag](seq: Seq[A])(implicit sc: SC) {
implicit class seqToRdd[A: ClassTag](
seq: Seq[A]
)(implicit
sc: SC) {
def toRdd: RDD[A] = sc.makeRDD(seq)
}
}

object Tests {
def innerPairwise(mx: Map[String, Int], my: Map[String, Int], check: (Any, Any) => Assertion)(implicit sc: SC): Assertion = {

def innerPairwise(
mx: Map[String, Int],
my: Map[String, Int],
check: (Any, Any) => Assertion
)(implicit
sc: SC
): Assertion = {
import frameless.cats.implicits._
import frameless.cats.inner._
val xs = sc.parallelize(mx.toSeq)
Expand All @@ -63,18 +78,27 @@ object Tests {
}
}

class Test extends AnyPropSpec with Matchers with ScalaCheckPropertyChecks with SparkTests {
class Test
extends AnyPropSpec
with Matchers
with ScalaCheckPropertyChecks
with SparkTests {

implicit override val generatorDrivenConfig =
PropertyCheckConfiguration(minSize = PosInt(10))

property("spark is working") {
sc.parallelize(Seq(1, 2, 3)).collect() shouldBe Array(1,2,3)
sc.parallelize(Seq(1, 2, 3)).collect() shouldBe Array(1, 2, 3)
}

property("inner pairwise monoid") {
// Make sure we have non-empty map
forAll { (xh: (String, Int), mx: Map[String, Int], yh: (String, Int), my: Map[String, Int]) =>
Tests.innerPairwise(mx + xh, my + yh, _ shouldBe _)
forAll {
(xh: (String, Int),
mx: Map[String, Int],
yh: (String, Int),
my: Map[String, Int]
) => Tests.innerPairwise(mx + xh, my + yh, _ shouldBe _)
}
}

Expand Down Expand Up @@ -110,7 +134,8 @@ class Test extends AnyPropSpec with Matchers with ScalaCheckPropertyChecks with
property("rdd tuple commutative semigroup example") {
import frameless.cats.implicits._
forAll { seq: List[(Int, Int)] =>
val expectedSum = if (seq.isEmpty) None else Some(Foldable[List].fold(seq))
val expectedSum =
if (seq.isEmpty) None else Some(Foldable[List].fold(seq))
val rdd = seq.toRdd

rdd.csum shouldBe expectedSum.getOrElse(0 -> 0)
Expand All @@ -120,10 +145,22 @@ class Test extends AnyPropSpec with Matchers with ScalaCheckPropertyChecks with

property("pair rdd numeric commutative semigroup example") {
import frameless.cats.implicits._
val seq = Seq( ("a",2), ("b",3), ("d",6), ("b",2), ("d",1) )
val seq = Seq(("a", 2), ("b", 3), ("d", 6), ("b", 2), ("d", 1))
val rdd = seq.toRdd
rdd.cminByKey.collect().toSeq should contain theSameElementsAs Seq( ("a",2), ("b",2), ("d",1) )
rdd.cmaxByKey.collect().toSeq should contain theSameElementsAs Seq( ("a",2), ("b",3), ("d",6) )
rdd.csumByKey.collect().toSeq should contain theSameElementsAs Seq( ("a",2), ("b",5), ("d",7) )
rdd.cminByKey.collect().toSeq should contain theSameElementsAs Seq(
("a", 2),
("b", 2),
("d", 1)
)
rdd.cmaxByKey.collect().toSeq should contain theSameElementsAs Seq(
("a", 2),
("b", 3),
("d", 6)
)
rdd.csumByKey.collect().toSeq should contain theSameElementsAs Seq(
("a", 2),
("b", 5),
("d", 7)
)
}
}
Loading
Loading