Skip to content

Commit

Permalink
KAFKA-3439; Added exceptions thrown
Browse files Browse the repository at this point in the history
Author: Eno Thereska <[email protected]>

Reviewers: Guozhang Wang

Closes apache#1213 from enothereska/KAFKA-3439-throws
  • Loading branch information
enothereska authored and gwenshap committed Apr 12, 2016
1 parent 319c6e7 commit ba9456d
Show file tree
Hide file tree
Showing 39 changed files with 232 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public KafkaStreams(TopologyBuilder builder, StreamsConfig config) {

/**
* Start the stream instance by starting all its threads.
* @throws IllegalStateException if process was already started
*/
public synchronized void start() {
log.debug("Starting Kafka Stream process");
Expand All @@ -157,6 +158,7 @@ public synchronized void start() {
/**
* Shutdown this stream instance by signaling all the threads to stop,
* and then wait for them to join.
* @throws IllegalStateException if process has not started yet
*/
public synchronized void close() {
log.debug("Stopping Kafka Stream process");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public AbstractStream(KStreamBuilder topology, String name, Set<String> sourceNo
this.sourceNodes = sourceNodes;
}

/**
* @throws TopologyBuilderException if the streams are not joinable
*/
protected Set<String> ensureJoinableWith(AbstractStream<K> other) {
Set<String> thisSourceNodes = sourceNodes;
Set<String> otherSourceNodes = other.sourceNodes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public void configure(Map<String, ?> configs, boolean isKey) {
// do nothing
}

/**
* @throws StreamsException if both old and new values of data are null, or if
* both values are not null
*/
@Override
public byte[] serialize(String topic, Change<T> data) {
byte[] serializedKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public void init(ProcessorContext context) {
store = (KeyValueStore<K, T>) context.getStateStore(storeName);
}

/**
* @throws StreamsException if key is null
*/
@Override
public void process(K key, V value) {
// the keys should never be null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ public void writeAsText(String filePath) {
writeAsText(filePath, null, null);
}

/**
* @throws TopologyBuilderException if file is not found
*/
@Override
public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde) {
String name = topology.newName(PRINTING_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> {

private final String windowName;

/**
* @throws TopologyBuilderException if retention period of the join window is less than expected
*/
KStreamJoinWindow(String windowName, long windowSizeMs, long retentionPeriodMs) {
this.windowName = windowName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ public void init(ProcessorContext context) {
otherWindow = (WindowStore<K, V2>) context.getStateStore(otherWindowName);
}

/**
* @throws StreamsException if key is null
*/
@Override
public void process(K key, V1 value) {
// the keys should never be null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public void init(ProcessorContext context) {
store = (KeyValueStore<K, V>) context.getStateStore(storeName);
}

/**
* @throws StreamsException if key is null
*/
@Override
public void process(K key, V value) {
// the keys should never be null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public void init(ProcessorContext context) {
store = (KeyValueStore<K, T>) context.getStateStore(storeName);
}

/**
* @throws StreamsException if key is null
*/
@Override
public void process(K key, Change<V> value) {
// the keys should never be null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ public void writeAsText(String filePath) {
writeAsText(filePath, null, null);
}

/**
* @throws TopologyBuilderException if file is not found
*/
@Override
public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde) {
String name = topology.newName(PRINTING_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ public void init(ProcessorContext context) {
valueGetter.init(context);
}

/**
* @throws StreamsException if key is null
*/
@Override
public void process(K key, Change<V1> change) {
// the keys should never be null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ public void init(ProcessorContext context) {
valueGetter.init(context);
}

/**
* @throws StreamsException if key is null
*/
@Override
public void process(K key, Change<V1> change) {
// the keys should never be null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ public void init(ProcessorContext context) {
valueGetter.init(context);
}

/**
* @throws StreamsException if key is null
*/
@Override
public void process(K key, Change<V1> change) {
// the keys should never be null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public void init(ProcessorContext context) {
valueGetter.init(context);
}

/**
* @throws StreamsException if key is null
*/
@Override
public void process(K key, Change<V1> change) {
// the keys should never be null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ public void init(ProcessorContext context) {
store = (KeyValueStore<K, V>) context.getStateStore(storeName);
}

/**
* @throws StreamsException if key is null
*/
@Override
public void process(K key, Change<V> value) {
// the keys should never be null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public KTableValueGetter<K, KeyValue<K1, V1>> get() {
};
}

/**
* @throws IllegalStateException since this method should never be called
*/
@Override
public void enableSendingOldValues() {
// this should never be called
Expand All @@ -74,6 +77,9 @@ private KeyValue<K1, V1> computeValue(K key, V value) {

private class KTableMapProcessor extends AbstractProcessor<K, Change<V>> {

/**
* @throws StreamsException if key is null
*/
@Override
public void process(K key, Change<V> change) {
KeyValue<K1, V1> newPair = computeValue(key, change.newValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ public Map<TaskId, Set<TopicPartition>> partitionGroups(Map<Integer, Set<String>
return Collections.unmodifiableMap(groups);
}

/**
* @throws StreamsException if no metadata can be received for a topic
*/
protected int maxNumPartitions(Cluster metadata, Set<String> topics) {
int maxNumPartitions = 0;
for (String topic : topics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public String toString() {
return topicGroupId + "_" + partition;
}

/**
* @throws TaskIdFormatException if the string is not a valid TaskId
*/
public static TaskId parse(String string) {
int index = string.indexOf('_');
if (index <= 0 || index + 1 >= string.length()) throw new TaskIdFormatException(string);
Expand All @@ -55,11 +58,17 @@ public static TaskId parse(String string) {
}
}

/**
* @throws IOException if cannot write to output stream
*/
public void writeTo(DataOutputStream out) throws IOException {
out.writeInt(topicGroupId);
out.writeInt(partition);
}

/**
* @throws IOException if cannot read from input stream
*/
public static TaskId readFrom(DataInputStream in) throws IOException {
return new TaskId(in.readInt(), in.readInt());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ public final TopologyBuilder addSource(String name, String... topics) {
* {@link org.apache.kafka.streams.StreamsConfig stream configuration}
* @param topics the name of one or more Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source
*/
public final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) {
if (nodeFactories.containsKey(name))
Expand Down Expand Up @@ -328,6 +329,7 @@ public final TopologyBuilder addSink(String name, String topic, Serializer keySe
* @see #addSink(String, String, String...)
* @see #addSink(String, String, StreamPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, String...)
* @throws TopologyBuilderException if parent processor is not added yet, or if this processor's name is equal to the parent's name
*/
public final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner, String... parentNames) {
if (nodeFactories.containsKey(name))
Expand Down Expand Up @@ -359,6 +361,7 @@ public final <K, V> TopologyBuilder addSink(String name, String topic, Serialize
* @param parentNames the name of one or more source or processor nodes whose output messages this processor should receive
* and process
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if parent processor is not added yet, or if this processor's name is equal to the parent's name
*/
public final TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames) {
if (nodeFactories.containsKey(name))
Expand Down Expand Up @@ -386,6 +389,7 @@ public final TopologyBuilder addProcessor(String name, ProcessorSupplier supplie
*
* @param supplier the supplier used to obtain this state store {@link StateStore} instance
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if state store supplier is already added
*/
public final TopologyBuilder addStateStore(StateStoreSupplier supplier, boolean isInternal, String... processorNames) {
if (stateFactories.containsKey(supplier.name())) {
Expand Down Expand Up @@ -438,6 +442,7 @@ public final TopologyBuilder connectProcessorAndStateStores(String processorName
*
* @param processorNames the name of the processors
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if less than two processors are specified, or if one of the processors is not added yet
*/
public final TopologyBuilder connectProcessors(String... processorNames) {
if (processorNames.length < 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public abstract class AbstractTask {
protected final Set<TopicPartition> partitions;
protected ProcessorContext processorContext;

/**
* @throws ProcessorStateException if the state manager cannot be created
*/
protected AbstractTask(TaskId id,
String applicationId,
Collection<TopicPartition> partitions,
Expand Down Expand Up @@ -101,6 +104,9 @@ public final ProcessorContext context() {

public abstract void commit();

/**
* @throws ProcessorStateException if there is an error while closing the state manager
*/
public void close() {
try {
stateMgr.close(recordCollectorOffsets());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ public class InternalTopicManager {

private class ZKStringSerializer implements ZkSerializer {

/**
* @throws AssertionError if the byte String encoding type is not supported
*/
@Override
public byte[] serialize(Object data) {
try {
Expand All @@ -60,6 +63,9 @@ public byte[] serialize(Object data) {
}
}

/**
* @throws AssertionError if the byte String encoding type is not supported
*/
@Override
public Object deserialize(byte[] bytes) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public class MinTimestampTracker<E> implements TimestampTracker<E> {
// record's timestamp
private long lastKnownTime = NOT_KNOWN;

/**
* @throws NullPointerException if the element is null
*/
public void addElement(Stamped<E> elem) {
if (elem == null) throw new NullPointerException();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ public long timestamp() {
return timestamp;
}

/**
* @throws IllegalStateException if the record's partition does not belong to this partition group
*/
public int numBuffered(TopicPartition partition) {
RecordQueue recordQueue = partitionQueues.get(partition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ public StreamsMetrics metrics() {
return metrics;
}

/**
* @throws IllegalStateException if this method is called before {@link #initialized()}
*/
@Override
public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
if (initialized)
Expand All @@ -113,6 +116,9 @@ public void register(StateStore store, boolean loggingEnabled, StateRestoreCallb
stateMgr.register(store, loggingEnabled, stateRestoreCallback);
}

/**
* @throws TopologyBuilderException if an attempt is made to access this state store from an unknown node
*/
@Override
public StateStore getStateStore(String name) {
ProcessorNode node = task.node();
Expand All @@ -127,6 +133,9 @@ public StateStore getStateStore(String name) {
return stateMgr.getStore(name);
}

/**
* @throws IllegalStateException if the task's record is null
*/
@Override
public String topic() {
if (task.record() == null)
Expand All @@ -140,6 +149,9 @@ public String topic() {
return topic;
}

/**
* @throws IllegalStateException if the task's record is null
*/
@Override
public int partition() {
if (task.record() == null)
Expand All @@ -148,6 +160,9 @@ public int partition() {
return task.record().partition();
}

/**
* @throws IllegalStateException if the task's record is null
*/
@Override
public long offset() {
if (this.task.record() == null)
Expand All @@ -156,6 +171,9 @@ public long offset() {
return this.task.record().offset();
}

/**
* @throws IllegalStateException if the task's record is null
*/
@Override
public long timestamp() {
if (task.record() == null)
Expand Down
Loading

0 comments on commit ba9456d

Please sign in to comment.