Skip to content

Commit

Permalink
typelevel#385 - chained ops impl
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-twiner committed Jun 6, 2023
1 parent e257c4c commit 154c65a
Show file tree
Hide file tree
Showing 3 changed files with 283 additions and 28 deletions.
5 changes: 5 additions & 0 deletions dataset/src/main/scala/frameless/FramelessSyntax.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package frameless

import frameless.ops.ChainedJoinOps
import org.apache.spark.sql.{Column, DataFrame, Dataset}

trait FramelessSyntax {
Expand All @@ -15,4 +16,8 @@ trait FramelessSyntax {
implicit class DataframeSyntax(self: DataFrame){
def unsafeTyped[T: TypedEncoder]: TypedDataset[T] = TypedDataset.createUnsafe(self)
}

implicit class ChainedJoinSyntax[T](ds: TypedDataset[T]) {
def join[U](other: TypedDataset[U]): ChainedJoinOps[T, U] = new ChainedJoinOps[T, U](ds, other)
}
}
159 changes: 159 additions & 0 deletions dataset/src/main/scala/frameless/ops/ChainedJoinOps.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package frameless.ops

import frameless.{TypedColumn, TypedDataset, TypedEncoder}

/**
* Collection of forwarding functions that optionally provide a reference to the incoming dataset for chaining of joins
* @param ds the dataset on which .join(other) was called
* @param other the dataset to which ds is joined
* @tparam T the type of ds
* @tparam U the type of other
*/
case class ChainedJoinOps[T, U](ds: TypedDataset[T], other: TypedDataset[U]) {
/** Computes the right outer join of `this` `Dataset` with the `other` `Dataset`,
* returning a `Tuple2` for each pair where condition evaluates to true.
*
* This version passes in the current dataset in the chain to the conditionF allowing you access to this TypedDataset's columns
*/
def right(conditionF: TypedDataset[T] => TypedColumn[T with U, Boolean])(implicit e: TypedEncoder[(Option[T], U)]): TypedDataset[(Option[T], U)] =
ds.joinRight(other)(conditionF(ds))

/** Computes the right outer join of `this` `Dataset` with the `other` `Dataset`,
* returning a `Tuple2` for each pair where condition evaluates to true.
*
* This version passes in the current and joined datasets in the chain to the conditionF allowing you access to this TypedDataset's columns and the joins
*/
def right(conditionF: (TypedDataset[T], TypedDataset[U]) => TypedColumn[T with U, Boolean])(implicit e: TypedEncoder[(Option[T], U)]): TypedDataset[(Option[T], U)] =
ds.joinRight(other)(conditionF(ds, other))

/** Computes the right outer join of `this` `Dataset` with the `other` `Dataset`,
* returning a `Tuple2` for each pair where condition evaluates to true.
*/
def right(condition: TypedColumn[T with U, Boolean])(implicit e: TypedEncoder[(Option[T], U)]): TypedDataset[(Option[T], U)] =
ds.joinRight(other)(condition)

/** Computes the cartesian project of `this` `Dataset` with the `other` `Dataset` */
def cross() // here for completeness
(implicit e: TypedEncoder[(T, U)]): TypedDataset[(T, U)] =
ds.joinCross(other)

/** Computes the full outer join of `this` `Dataset` with the `other` `Dataset`,
* returning a `Tuple2` for each pair where condition evaluates to true.
*/
def full(condition: TypedColumn[T with U, Boolean])
(implicit e: TypedEncoder[(Option[T], Option[U])]): TypedDataset[(Option[T], Option[U])] =
ds.joinFull(other)(condition)

/** Computes the full outer join of `this` `Dataset` with the `other` `Dataset`,
* returning a `Tuple2` for each pair where condition evaluates to true.
*
* This version passes in the current dataset in the chain to the conditionF allowing you access to this TypedDataset's columns
*/
def full(conditionF: TypedDataset[T] => TypedColumn[T with U, Boolean])
(implicit e: TypedEncoder[(Option[T], Option[U])]): TypedDataset[(Option[T], Option[U])] =
ds.joinFull(other)(conditionF(ds))

/** Computes the full outer join of `this` `Dataset` with the `other` `Dataset`,
* returning a `Tuple2` for each pair where condition evaluates to true.
*
* This version passes in the current and joined datasets in the chain to the conditionF allowing you access to this TypedDataset's columns and the joins
*/
def full(conditionF: (TypedDataset[T], TypedDataset[U]) => TypedColumn[T with U, Boolean])
(implicit e: TypedEncoder[(Option[T], Option[U])]): TypedDataset[(Option[T], Option[U])] =
ds.joinFull(other)(conditionF(ds, other))

/** Computes the inner join of `this` `Dataset` with the `other` `Dataset`,
* returning a `Tuple2` for each pair where condition evaluates to true.
*/
def inner(condition: TypedColumn[T with U, Boolean])
(implicit e: TypedEncoder[(T, U)]): TypedDataset[(T, U)] =
ds.joinInner(other)(condition)

/** Computes the inner join of `this` `Dataset` with the `other` `Dataset`,
* returning a `Tuple2` for each pair where condition evaluates to true.
*
* This version passes in the current dataset in the chain to the conditionF allowing you access to this TypedDataset's columns
*/
def inner(conditionF: TypedDataset[T] => TypedColumn[T with U, Boolean])
(implicit e: TypedEncoder[(T, U)]): TypedDataset[(T, U)] =
ds.joinInner(other)(conditionF(ds))

/** Computes the inner join of `this` `Dataset` with the `other` `Dataset`,
* returning a `Tuple2` for each pair where condition evaluates to true.
*
* This version passes in the current and joined datasets in the chain to the conditionF allowing you access to this TypedDataset's columns and the joins
*/
def inner(conditionF: (TypedDataset[T], TypedDataset[U]) => TypedColumn[T with U, Boolean])
(implicit e: TypedEncoder[(T, U)]): TypedDataset[(T, U)] =
ds.joinInner(other)(conditionF(ds, other))

/** Computes the left outer join of `this` `Dataset` with the `other` `Dataset`,
* returning a `Tuple2` for each pair where condition evaluates to true.
*/
def left(condition: TypedColumn[T with U, Boolean])
(implicit e: TypedEncoder[(T, Option[U])]): TypedDataset[(T, Option[U])] =
ds.joinLeft(other)(condition)

/** Computes the left outer join of `this` `Dataset` with the `other` `Dataset`,
* returning a `Tuple2` for each pair where condition evaluates to true.
*
* This version passes in the current dataset in the chain to the conditionF allowing you access to this TypedDataset's columns
*/
def left(conditionF: TypedDataset[T] => TypedColumn[T with U, Boolean])
(implicit e: TypedEncoder[(T, Option[U])]): TypedDataset[(T, Option[U])] =
ds.joinLeft(other)(conditionF(ds))

/** Computes the left outer join of `this` `Dataset` with the `other` `Dataset`,
* returning a `Tuple2` for each pair where condition evaluates to true.
*
* This version passes in the current and joined datasets in the chain to the conditionF allowing you access to this TypedDataset's columns and the joins
*/
def left(conditionF: (TypedDataset[T], TypedDataset[U]) => TypedColumn[T with U, Boolean])
(implicit e: TypedEncoder[(T, Option[U])]): TypedDataset[(T, Option[U])] =
ds.joinLeft(other)(conditionF(ds,other))

/** Computes the left semi join of `this` `Dataset` with the `other` `Dataset`,
* returning a `Tuple2` for each pair where condition evaluates to true.
*/
def leftSemi(condition: TypedColumn[T with U, Boolean]): TypedDataset[T] =
ds.joinLeftSemi(other)(condition)

/** Computes the left semi join of `this` `Dataset` with the `other` `Dataset`,
* returning a `Tuple2` for each pair where condition evaluates to true.
*
* This version passes in the current dataset in the chain to the conditionF allowing you access to this TypedDataset's columns
*/
def leftSemi(conditionF: TypedDataset[T] => TypedColumn[T with U, Boolean]): TypedDataset[T] =
ds.joinLeftSemi(other)(conditionF(ds))

/** Computes the left semi join of `this` `Dataset` with the `other` `Dataset`,
* returning a `Tuple2` for each pair where condition evaluates to true.
*
* This version passes in the current and joined datasets in the chain to the conditionF allowing you access to this TypedDataset's columns and the joins
*/
def leftSemi(conditionF: (TypedDataset[T], TypedDataset[U]) => TypedColumn[T with U, Boolean]): TypedDataset[T] =
ds.joinLeftSemi(other)(conditionF(ds, other))

/** Computes the left anti join of `this` `Dataset` with the `other` `Dataset`,
* returning a `Tuple2` for each pair where condition evaluates to true.
*/
def leftAnti(condition: TypedColumn[T with U, Boolean]): TypedDataset[T] =
ds.joinLeftAnti(other)(condition)

/** Computes the left anti join of `this` `Dataset` with the `other` `Dataset`,
* returning a `Tuple2` for each pair where condition evaluates to true.
*
* This version passes in the current dataset in the chain to the conditionF allowing you access to this TypedDataset's columns
*/
def leftAnti(conditionF: TypedDataset[T] => TypedColumn[T with U, Boolean]): TypedDataset[T] =
ds.joinLeftAnti(other)(conditionF(ds))

/** Computes the left anti join of `this` `Dataset` with the `other` `Dataset`,
* returning a `Tuple2` for each pair where condition evaluates to true.
*
* This version passes in the current and joined datasets in the chain to the conditionF allowing you access to this TypedDataset's columns and the joins
*/
def leftAnti(conditionF: (TypedDataset[T], TypedDataset[U]) => TypedColumn[T with U, Boolean]): TypedDataset[T] =
ds.joinLeftAnti(other)(conditionF(ds, other))

}
Loading

0 comments on commit 154c65a

Please sign in to comment.