Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50770][SS] Removing package scope for transformWithState operator APIs #49417

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Iterator;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.streaming.GroupState;

/**
Expand All @@ -32,7 +31,6 @@
* org.apache.spark.sql.Encoder, org.apache.spark.sql.Encoder)}
* @since 2.1.1
*/
@Experimental
@Evolving
public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends Serializable {
Iterator<R> call(K key, Iterator<V> values, GroupState<S> state) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Iterator;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.streaming.GroupState;

/**
Expand All @@ -31,7 +30,6 @@
* MapGroupsWithStateFunction, org.apache.spark.sql.Encoder, org.apache.spark.sql.Encoder)}
* @since 2.1.1
*/
@Experimental
@Evolving
public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
R call(K key, Iterator<V> values, GroupState<S> state) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.streaming;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.catalyst.plans.logical.*;

/**
Expand All @@ -29,7 +28,6 @@
*
* @since 2.2.0
*/
@Experimental
@Evolving
public class GroupStateTimeout {
// NOTE: if you're adding new type of timeout, you should also fix the places below:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.streaming;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.catalyst.plans.logical.EventTime$;
import org.apache.spark.sql.catalyst.plans.logical.NoTime$;
import org.apache.spark.sql.catalyst.plans.logical.ProcessingTime$;
Expand All @@ -27,7 +26,6 @@
* Represents the time modes (used for specifying timers and ttl) possible for
* the Dataset operations {@code transformWithState}.
*/
@Experimental
@Evolving
public class TimeMode {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ package org.apache.spark.sql.streaming

import java.io.Serializable

import org.apache.spark.annotation.{Evolving, Experimental}
import org.apache.spark.annotation.Evolving

/**
* Class used to provide access to expired timer's expiry time.
*/
@Experimental
@Evolving
ericm-db marked this conversation as resolved.
Show resolved Hide resolved
private[sql] trait ExpiredTimerInfo extends Serializable {
trait ExpiredTimerInfo extends Serializable {

/**
* Get the expired timer's expiry time as milliseconds in epoch time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.streaming

import org.apache.spark.annotation.{Evolving, Experimental}
import org.apache.spark.annotation.Evolving
import org.apache.spark.sql.catalyst.plans.logical.LogicalGroupState

/**
Expand Down Expand Up @@ -196,7 +196,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalGroupState
* types (see `Encoder` for more details).
* @since 2.2.0
*/
@Experimental
@Evolving
trait GroupState[S] extends LogicalGroupState[S] {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
*/
package org.apache.spark.sql.streaming

import org.apache.spark.annotation.{Evolving, Experimental}
import org.apache.spark.annotation.Evolving

@Experimental
@Evolving
/**
* Interface used for arbitrary stateful operations with the v2 API to capture list value state.
*/
private[sql] trait ListState[S] extends Serializable {
trait ListState[S] extends Serializable {

/** Whether state exists or not. */
def exists(): Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
*/
package org.apache.spark.sql.streaming

import org.apache.spark.annotation.{Evolving, Experimental}
import org.apache.spark.annotation.Evolving

@Experimental
@Evolving
/**
* Interface used for arbitrary stateful operations with the v2 API to capture map value state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ package org.apache.spark.sql.streaming
import java.io.Serializable
import java.util.UUID

import org.apache.spark.annotation.{Evolving, Experimental}
import org.apache.spark.annotation.Evolving

/**
* Represents the query info provided to the stateful processor used in the arbitrary state API v2
* to easily identify task retries on the same partition.
*/
@Experimental
@Evolving
private[sql] trait QueryInfo extends Serializable {
trait QueryInfo extends Serializable {

/** Returns the streaming query id associated with stateful operator */
def getQueryId: UUID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming

import java.io.Serializable

import org.apache.spark.annotation.{Evolving, Experimental}
import org.apache.spark.annotation.Evolving
import org.apache.spark.sql.api.EncoderImplicits
import org.apache.spark.sql.errors.ExecutionErrors

Expand All @@ -30,9 +30,8 @@ import org.apache.spark.sql.errors.ExecutionErrors
* Users can also explicitly use `import implicits._` to access the EncoderImplicits and use the
* state variable APIs relying on implicit encoders.
*/
@Experimental
@Evolving
private[sql] abstract class StatefulProcessor[K, I, O] extends Serializable {
abstract class StatefulProcessor[K, I, O] extends Serializable {

// scalastyle:off
// Disable style checker so "implicits" object can start with lowercase i
Expand Down Expand Up @@ -123,10 +122,8 @@ private[sql] abstract class StatefulProcessor[K, I, O] extends Serializable {
* initial state to be initialized in the first batch. This can be used for starting a new
* streaming query with existing state from a previous streaming query.
*/
@Experimental
@Evolving
private[sql] abstract class StatefulProcessorWithInitialState[K, I, O, S]
extends StatefulProcessor[K, I, O] {
abstract class StatefulProcessorWithInitialState[K, I, O, S] extends StatefulProcessor[K, I, O] {

/**
* Function that will be invoked only in the first batch for users to process initial states.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@ package org.apache.spark.sql.streaming

import java.io.Serializable

import org.apache.spark.annotation.{Evolving, Experimental}
import org.apache.spark.annotation.Evolving
import org.apache.spark.sql.Encoder

/**
* Represents the operation handle provided to the stateful processor used in the arbitrary state
* API v2.
*/
@Experimental
@Evolving
private[sql] trait StatefulProcessorHandle extends Serializable {
trait StatefulProcessorHandle extends Serializable {

/**
* Function to create new or return existing single value state variable of given type with ttl.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ package org.apache.spark.sql.streaming

import java.io.Serializable

import org.apache.spark.annotation.{Evolving, Experimental}
import org.apache.spark.annotation.Evolving

/**
* Class used to provide access to timer values for processing and event time populated before
* method invocations using the arbitrary state API v2.
*/
@Experimental
@Evolving
private[sql] trait TimerValues extends Serializable {
trait TimerValues extends Serializable {

/**
* Get the current processing time as milliseconds in epoch time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ package org.apache.spark.sql.streaming

import java.io.Serializable

import org.apache.spark.annotation.{Evolving, Experimental}
import org.apache.spark.annotation.Evolving

@Experimental
@Evolving
/**
* Interface used for arbitrary stateful operations with the v2 API to capture single value state.
*/
private[sql] trait ValueState[S] extends Serializable {
trait ValueState[S] extends Serializable {

/** Whether state exists or not. */
def exists(): Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
}

/** @inheritdoc */
private[sql] def transformWithState[U: Encoder](
def transformWithState[U: Encoder](
statefulProcessor: StatefulProcessor[K, V, U],
timeMode: TimeMode,
outputMode: OutputMode): Dataset[U] = {
Expand All @@ -219,7 +219,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
}

/** @inheritdoc */
private[sql] def transformWithState[U: Encoder](
def transformWithState[U: Encoder](
statefulProcessor: StatefulProcessor[K, V, U],
eventTimeColumnName: String,
outputMode: OutputMode): Dataset[U] = {
Expand All @@ -235,7 +235,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
}

/** @inheritdoc */
private[sql] def transformWithState[U: Encoder, S: Encoder](
def transformWithState[U: Encoder, S: Encoder](
statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S],
timeMode: TimeMode,
outputMode: OutputMode,
Expand All @@ -257,7 +257,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
}

/** @inheritdoc */
private[sql] def transformWithState[U: Encoder, S: Encoder](
def transformWithState[U: Encoder, S: Encoder](
statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S],
eventTimeColumnName: String,
outputMode: OutputMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.streaming

import org.apache.spark.annotation.{Evolving, Experimental}
import org.apache.spark.annotation.Evolving
import org.apache.spark.api.java.Optional
import org.apache.spark.sql.execution.streaming.GroupStateImpl
import org.apache.spark.sql.execution.streaming.GroupStateImpl._
Expand Down Expand Up @@ -114,7 +114,6 @@ import org.apache.spark.sql.execution.streaming.GroupStateImpl._
* Spark SQL types (see `Encoder` for more details).
* @since 3.2.0
*/
@Experimental
@Evolving
trait TestGroupState[S] extends GroupState[S] {
/** Whether the state has been marked for removing */
Expand Down
Loading