Skip to content

Commit

Permalink
3.5.1 beta (#188)
Browse files Browse the repository at this point in the history
* add github files

* client config as one json (#166)

* client config as one json

* remove logic

---------

Co-authored-by: Idan Asulin <[email protected]>

* test fat jar

* fix branch

* add plugin

* add archive option

* fix serve file

* fix

* test app

* fix

* fix

* Rnd 759 client parameters to store (#168)

* client config as one json

* remove logic

* remove superstream connection entry from configToSend

* update version beta file

* fix

* remove comments

* remove imports

* support update full config for client

* change the versions and pass client ip and host

* handle consumer full config update

* handle consumer full config update

* gradle_build_imports

* handle admin client update full config

---------

Co-authored-by: Idan Asulin <[email protected]>

* Rnd 759 client parameters to store (#169)

* client config as one json

* remove logic

* remove superstream connection entry from configToSend

* update version beta file

* fix

* remove comments

* remove imports

* support update full config for client

* change the versions and pass client ip and host

* handle consumer full config update

* handle consumer full config update

* gradle_build_imports

* handle admin client update full config

* back to 113

---------

Co-authored-by: Idan Asulin <[email protected]>

* revert none relevant changes into Jenkinsfile

* remove none relevant comments

* Rnd 759 client parameters to store (#172)

* client config as one json

* remove logic

* remove superstream connection entry from configToSend

* update version beta file

* fix

* remove comments

* remove imports

* support update full config for client

* change the versions and pass client ip and host

* handle consumer full config update

* handle consumer full config update

* gradle_build_imports

* handle admin client update full config

* back to 113

* buildNewGradle

* wait mechanism for canstart

* synchronized the other thread

* wait mechanism refactor

* remove comment

* beta version upgrade

---------

Co-authored-by: Idan Asulin <[email protected]>

* added SUPERSTREAM_DEBUG env var handle- disable and able all stdout (#173)

* added SUPERSTREAM_DEBUG env var handle- disable and able all stdout

* version beta

* version beta --> 3

* refactor with consts for initSuperstreamConfig method

* change SUPERSTREAM_DEBUG env var affect only for superstream stdout

* log for test

* revert test log

* change consts env var names

* revert partitions.contains

* serielizer/desirielizer handle for payload reduction. empty methods

* revert contains check

* stdout handle outside of superstream class

* changed superstream connection log in adnminKafka

* move log for Successfully connection to superstream after waitForStart

* move it again

* Rnd 955 support in changing client config parameters (#174)

* mechnisim of wait for superstream config and config bootstrap servers remove for test

* move place for test

* add getter to abstract config values. remove the bootstrap servers key-val

* set the superstream config vaues inside kafka producer config

* wait for super stream config move to super stream class, wait with object lock to support release cpu in wait interval

* refactor for waiting methods

* default timeout for superstream config

* move getter location

* list of supported client added in consts- we register clients only if type in the list

* to lower case added

* move type check to the AbstractConfig

* move import

* upgrade beta version-beta.conf

* Rnd 955 support in changing client config parameters (#175)

* mechnisim of wait for superstream config and config bootstrap servers remove for test

* move place for test

* add getter to abstract config values. remove the bootstrap servers key-val

* set the superstream config vaues inside kafka producer config

* wait for super stream config move to super stream class, wait with object lock to support release cpu in wait interval

* refactor for waiting methods

* default timeout for superstream config

* move getter location

* list of supported client added in consts- we register clients only if type in the list

* to lower case added

* move type check to the AbstractConfig

* move import

* upgrade beta version-beta.conf

* fix stdout when superstream failed initializing

* upgrade version beta

* remove_pr_template (#176)

* Remove pr template (#177)

* remove_pr_template

* move PULL_REQUEST_TEMPLATE

* remove unnessecary method (#179)

* SuperstreamObjectMapper for full config map to bytes (#182)

* SuperstreamObjectMapper for full config map to bytes

* SuperstreamObjectMapper for full config map to bytes small fix

* mapper change

* mapper testing problematic entry

* mapper testing different approach

* mapper testing different approach 2

* version beta upgrade

* clean imports

* versions upgrade

* SUPERSTREAM_DEBUG - default is false - will not see logs (#184)

* Fix client config update (#187)

* fix client config update + add deep copy

* fix null fullClientConfigs

* pr template

---------

Co-authored-by: idanasulinStrech <[email protected]>
Co-authored-by: liranbahar <[email protected]>
Co-authored-by: Idan Asulin <[email protected]>
Co-authored-by: Beka Kotchauri <[email protected]>
Co-authored-by: Beka Kotchauri <[email protected]>
  • Loading branch information
6 people authored Sep 15, 2024
1 parent 4431088 commit 4f18ef2
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 19 deletions.
21 changes: 11 additions & 10 deletions PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,41 +10,42 @@ Please describe the tests that you ran to verify your changes. Please also note
- [ ] I have made corresponding changes to the knowledge base (if needed)
- [ ] My changes generate no new warnings
- [ ] I have verified that the specification is met and all functionalities are working as expected
- [ ] I resolved conflicts

## Reviewer Score - 0-100%
## Reviewer Score - 100%

- [ ] **Meeting Task Specifications (50%)**
- [x] **Meeting Task Specifications (50%)**
- This includes both UI design and backend functionality.
- Ensure that the task requirements are fully met and that the implementation aligns with the specifications provided.

- [ ] **Attention to Edge Cases (10%)**
- [x] **Attention to Edge Cases (10%)**
- Identify and handle edge cases that may not be immediately obvious.
- Demonstrate thorough testing and consideration of potential issues.

- [ ] **Writing Performant and Efficient Code (10%)**
- [x] **Writing Performant and Efficient Code (10%)**
- Optimize the code for performance and efficiency.
- Avoid unnecessary computations and strive for optimal resource usage.

- [ ] **Addressing Feedback from Previous Code Reviews (10%)**
- [x] **Addressing Feedback from Previous Code Reviews (10%)**
- Act on feedback provided in previous code reviews.
- Show improvement and a proactive approach to learning from past reviews.

- [ ] **Adherence to Coding Conventions (5%)**
- [x] **Adherence to Coding Conventions (5%)**
- Follow the established coding standards and guidelines.
- Maintain consistency in style and structure throughout the codebase.

- [ ] **Writing Readable Code (5%)**
- [x] **Writing Readable Code (5%)**
- Write code that is easy to read and understand.
- Use clear and meaningful variable names, and include comments where necessary.

- [ ] **Considering Aspects Not Explicitly Mentioned in the Specification (5%)**
- [x] **Considering Aspects Not Explicitly Mentioned in the Specification (5%)**
- Demonstrate initiative by considering aspects that may not be explicitly mentioned in the task specification.
- Enhance the implementation by thinking beyond the basic requirements.

- [ ] **Completing Pull Request Form (2.5%)**
- [x] **Completing Pull Request Form (2.5%)**
- Demonstrate initiative by considering aspects that may not be explicitly mentioned in the task specification.
- Enhance the implementation by thinking beyond the basic requirements.

- [ ] **Up to 2 Cycles of Code Review (2.5%)**
- [x] **Up to 2 Cycles of Code Review (2.5%)**
- Engage in up to two cycles of code review to refine and improve the code.
- Incorporate suggestions and resolve any identified issues.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import java.util.List;

public class Consts {
public static final String sdkVersion = "3.5.115";
public static final String sdkVersion = "3.5.116";
public static final String clientReconnectionUpdateSubject = "internal_tasks.clientReconnectionUpdate";
public static final String clientTypeUpdateSubject = "internal.clientTypeUpdate";
public static final String clientConfigUpdateSubject = "internal.clientConfigUpdate";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.InetAddress;
Expand Down Expand Up @@ -52,7 +56,7 @@ public class Superstream {
public String ConsumerSchemaID = "0";
public Map<String, Descriptors.Descriptor> SchemaIDMap = new HashMap<>();
public Map<String, Object> configs;
private Map<String, Object> fullClientConfigs;
private Map<String, Object> fullClientConfigs = new HashMap<>();
private Map<String, ?> superstreamConfigs;
public SuperstreamCounters clientCounters = new SuperstreamCounters();
private Subscription updatesSubscription;
Expand Down Expand Up @@ -84,7 +88,7 @@ public Superstream(String token, String host, Integer learningFactor, Map<String
this.learningFactor = learningFactor;
this.token = token;
this.host = host;
this.configs = configs;
this.configs = deepCopyMap(configs);
this.reductionEnabled = enableReduction;
this.type = type;
this.tags = tags;
Expand Down Expand Up @@ -275,6 +279,32 @@ public void registerClient(Map<String, ?> configs) {
superstreamPrintStream.println(String.format("superstream: %s", e.getMessage()));
}
}
public Map<String, Object> deepCopyMap(Map<String, ?> originalMap) {
Map<String, Object> copiedMap = new HashMap<>();

for (Map.Entry<String, ?> entry : originalMap.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
Object copiedValue;
copiedValue = deepCopyObject(value);
copiedMap.put(key, copiedValue);
}
return copiedMap;
}

@SuppressWarnings("unchecked")
private <T> T deepCopyObject(Object object) {
try {
ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(byteOut);
out.writeObject(object);
ByteArrayInputStream byteIn = new ByteArrayInputStream(byteOut.toByteArray());
ObjectInputStream in = new ObjectInputStream(byteIn);
return (T) in.readObject();
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("Error during deep copy", e);
}
}

private Map<String, Object> populateConfigToSend(Map<String, ?> configs) {
Map<String, Object> configToSend = new HashMap<>();
Expand All @@ -284,7 +314,6 @@ private Map<String, Object> populateConfigToSend(Map<String, ?> configs) {
configToSend.put(entry.getKey(), entry.getValue());
}
}

}

return configToSend;
Expand Down Expand Up @@ -535,6 +564,11 @@ private void convertEntryValueWhenNoSerializer(Map<String, Object> config, Objec
if (config != null && !config.isEmpty()) {
for (Map.Entry<String, Object> entry : config.entrySet()) {
Object value = entry.getValue();
String key = entry.getKey();
if (key == "sasl.jaas.config"){
entry.setValue("[hidden]");
continue;
}
try {
mapper.writeValueAsBytes(value);
} catch (JsonProcessingException e) {
Expand Down Expand Up @@ -1084,8 +1118,18 @@ public void updateTopicPartitions(String topic, Integer partition) {
partitions.add(partition);
}

public void setFullClientConfigs(Map<String, ?> configs) {
this.fullClientConfigs = (Map<String, Object>) configs;
public void setFullClientConfigs(Map<String, ?> configsUpdate) {
for (Map.Entry<String, ?> entry : configsUpdate.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
Object copiedValue;
if (this.configs.containsKey(key)) {
copiedValue = deepCopyObject(this.configs.get(key));
} else {
copiedValue = deepCopyObject(value);
}
this.fullClientConfigs.put(key, copiedValue);
}
executeSendClientConfigUpdateReqWithWait();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.5.115
3.5.116
2 changes: 1 addition & 1 deletion version-beta.conf
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.5.115-beta
3.5.116-beta
2 changes: 1 addition & 1 deletion version.conf
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.5.115
3.5.116

0 comments on commit 4f18ef2

Please sign in to comment.