Skip to content

Commit

Permalink
[SPARK-50770][SS] Removing package scope for transformWithState opera…
Browse files Browse the repository at this point in the history
…tor APIs

### What changes were proposed in this pull request?

Removing package scope for transformWithState operator APIs

This enables us to release the transformWithState APIs from the next Spark release. While traits and methods expand visibility, the usage is blocked due to a flag. The flag will be flipped in separate PR and corresponding behavior change ticket will be filed with it.

### Why are the changes needed?

To allow use of the transformWithState operator

### Does this PR introduce _any_ user-facing change?

Yes

### How was this patch tested?

Existing unit tests

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #49417 from ericm-db/enable-tws.

Authored-by: Eric Marnadi <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit be77e1e)
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
ericm-db authored and HeartSaVioR committed Jan 24, 2025
1 parent f2765f4 commit f264560
Show file tree
Hide file tree
Showing 15 changed files with 22 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Iterator;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.streaming.GroupState;

/**
Expand All @@ -32,7 +31,6 @@
* org.apache.spark.sql.Encoder, org.apache.spark.sql.Encoder)}
* @since 2.1.1
*/
@Experimental
@Evolving
public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends Serializable {
Iterator<R> call(K key, Iterator<V> values, GroupState<S> state) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Iterator;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.streaming.GroupState;

/**
Expand All @@ -31,7 +30,6 @@
* MapGroupsWithStateFunction, org.apache.spark.sql.Encoder, org.apache.spark.sql.Encoder)}
* @since 2.1.1
*/
@Experimental
@Evolving
public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
R call(K key, Iterator<V> values, GroupState<S> state) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.streaming;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.catalyst.plans.logical.*;

/**
Expand All @@ -29,7 +28,6 @@
*
* @since 2.2.0
*/
@Experimental
@Evolving
public class GroupStateTimeout {
// NOTE: if you're adding new type of timeout, you should also fix the places below:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.streaming;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.catalyst.plans.logical.EventTime$;
import org.apache.spark.sql.catalyst.plans.logical.NoTime$;
import org.apache.spark.sql.catalyst.plans.logical.ProcessingTime$;
Expand All @@ -27,7 +26,6 @@
* Represents the time modes (used for specifying timers and ttl) possible for
* the Dataset operations {@code transformWithState}.
*/
@Experimental
@Evolving
public class TimeMode {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ package org.apache.spark.sql.streaming

import java.io.Serializable

import org.apache.spark.annotation.{Evolving, Experimental}
import org.apache.spark.annotation.Evolving

/**
* Class used to provide access to expired timer's expiry time.
*/
@Experimental
@Evolving
private[sql] trait ExpiredTimerInfo extends Serializable {
trait ExpiredTimerInfo extends Serializable {

/**
* Get the expired timer's expiry time as milliseconds in epoch time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.streaming

import org.apache.spark.annotation.{Evolving, Experimental}
import org.apache.spark.annotation.Evolving
import org.apache.spark.sql.catalyst.plans.logical.LogicalGroupState

/**
Expand Down Expand Up @@ -196,7 +196,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalGroupState
* types (see `Encoder` for more details).
* @since 2.2.0
*/
@Experimental
@Evolving
trait GroupState[S] extends LogicalGroupState[S] {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
*/
package org.apache.spark.sql.streaming

import org.apache.spark.annotation.{Evolving, Experimental}
import org.apache.spark.annotation.Evolving

@Experimental
@Evolving
/**
* Interface used for arbitrary stateful operations with the v2 API to capture list value state.
*/
private[sql] trait ListState[S] extends Serializable {
trait ListState[S] extends Serializable {

/** Whether state exists or not. */
def exists(): Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
*/
package org.apache.spark.sql.streaming

import org.apache.spark.annotation.{Evolving, Experimental}
import org.apache.spark.annotation.Evolving

@Experimental
@Evolving
/**
* Interface used for arbitrary stateful operations with the v2 API to capture map value state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ package org.apache.spark.sql.streaming
import java.io.Serializable
import java.util.UUID

import org.apache.spark.annotation.{Evolving, Experimental}
import org.apache.spark.annotation.Evolving

/**
* Represents the query info provided to the stateful processor used in the arbitrary state API v2
* to easily identify task retries on the same partition.
*/
@Experimental
@Evolving
private[sql] trait QueryInfo extends Serializable {
trait QueryInfo extends Serializable {

/** Returns the streaming query id associated with stateful operator */
def getQueryId: UUID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming

import java.io.Serializable

import org.apache.spark.annotation.{Evolving, Experimental}
import org.apache.spark.annotation.Evolving
import org.apache.spark.sql.api.EncoderImplicits
import org.apache.spark.sql.errors.ExecutionErrors

Expand All @@ -30,9 +30,8 @@ import org.apache.spark.sql.errors.ExecutionErrors
* Users can also explicitly use `import implicits._` to access the EncoderImplicits and use the
* state variable APIs relying on implicit encoders.
*/
@Experimental
@Evolving
private[sql] abstract class StatefulProcessor[K, I, O] extends Serializable {
abstract class StatefulProcessor[K, I, O] extends Serializable {

// scalastyle:off
// Disable style checker so "implicits" object can start with lowercase i
Expand Down Expand Up @@ -123,10 +122,8 @@ private[sql] abstract class StatefulProcessor[K, I, O] extends Serializable {
* initial state to be initialized in the first batch. This can be used for starting a new
* streaming query with existing state from a previous streaming query.
*/
@Experimental
@Evolving
private[sql] abstract class StatefulProcessorWithInitialState[K, I, O, S]
extends StatefulProcessor[K, I, O] {
abstract class StatefulProcessorWithInitialState[K, I, O, S] extends StatefulProcessor[K, I, O] {

/**
* Function that will be invoked only in the first batch for users to process initial states.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@ package org.apache.spark.sql.streaming

import java.io.Serializable

import org.apache.spark.annotation.{Evolving, Experimental}
import org.apache.spark.annotation.Evolving
import org.apache.spark.sql.Encoder

/**
* Represents the operation handle provided to the stateful processor used in the arbitrary state
* API v2.
*/
@Experimental
@Evolving
private[sql] trait StatefulProcessorHandle extends Serializable {
trait StatefulProcessorHandle extends Serializable {

/**
* Function to create new or return existing single value state variable of given type with ttl.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ package org.apache.spark.sql.streaming

import java.io.Serializable

import org.apache.spark.annotation.{Evolving, Experimental}
import org.apache.spark.annotation.Evolving

/**
* Class used to provide access to timer values for processing and event time populated before
* method invocations using the arbitrary state API v2.
*/
@Experimental
@Evolving
private[sql] trait TimerValues extends Serializable {
trait TimerValues extends Serializable {

/**
* Get the current processing time as milliseconds in epoch time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ package org.apache.spark.sql.streaming

import java.io.Serializable

import org.apache.spark.annotation.{Evolving, Experimental}
import org.apache.spark.annotation.Evolving

@Experimental
@Evolving
/**
* Interface used for arbitrary stateful operations with the v2 API to capture single value state.
*/
private[sql] trait ValueState[S] extends Serializable {
trait ValueState[S] extends Serializable {

/** Whether state exists or not. */
def exists(): Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
}

/** @inheritdoc */
private[sql] def transformWithState[U: Encoder](
def transformWithState[U: Encoder](
statefulProcessor: StatefulProcessor[K, V, U],
timeMode: TimeMode,
outputMode: OutputMode): Dataset[U] = {
Expand All @@ -219,7 +219,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
}

/** @inheritdoc */
private[sql] def transformWithState[U: Encoder](
def transformWithState[U: Encoder](
statefulProcessor: StatefulProcessor[K, V, U],
eventTimeColumnName: String,
outputMode: OutputMode): Dataset[U] = {
Expand All @@ -235,7 +235,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
}

/** @inheritdoc */
private[sql] def transformWithState[U: Encoder, S: Encoder](
def transformWithState[U: Encoder, S: Encoder](
statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S],
timeMode: TimeMode,
outputMode: OutputMode,
Expand All @@ -257,7 +257,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
}

/** @inheritdoc */
private[sql] def transformWithState[U: Encoder, S: Encoder](
def transformWithState[U: Encoder, S: Encoder](
statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S],
eventTimeColumnName: String,
outputMode: OutputMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.streaming

import org.apache.spark.annotation.{Evolving, Experimental}
import org.apache.spark.annotation.Evolving
import org.apache.spark.api.java.Optional
import org.apache.spark.sql.execution.streaming.GroupStateImpl
import org.apache.spark.sql.execution.streaming.GroupStateImpl._
Expand Down Expand Up @@ -114,7 +114,6 @@ import org.apache.spark.sql.execution.streaming.GroupStateImpl._
* Spark SQL types (see `Encoder` for more details).
* @since 3.2.0
*/
@Experimental
@Evolving
trait TestGroupState[S] extends GroupState[S] {
/** Whether the state has been marked for removing */
Expand Down

0 comments on commit f264560

Please sign in to comment.