Skip to content

Commit

Permalink
Fix kafka transmission error.
Browse files Browse the repository at this point in the history
  • Loading branch information
hexiaofeng committed Jun 3, 2024
1 parent 7664592 commit 0614ed5
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import lombok.Setter;

import java.lang.management.ManagementFactory;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -71,9 +70,6 @@ public class Application {
// Unique application identifier.
public static final String APP_ID = UUID.randomUUID().toString();

// Byte representation of the application ID.
public static final byte[] APP_ID_BYTES = APP_ID.getBytes(StandardCharsets.UTF_8);

// Name of the application.
@Setter
@Config("name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.jd.live.agent.core.inject.annotation.Inject;
import com.jd.live.agent.core.inject.annotation.InjectLoader;
import com.jd.live.agent.core.inject.annotation.Injectable;
import com.jd.live.agent.core.instance.Application;
import com.jd.live.agent.core.plugin.definition.InterceptorDefinition;
import com.jd.live.agent.core.plugin.definition.InterceptorDefinitionAdapter;
import com.jd.live.agent.core.plugin.definition.PluginDefinition;
Expand All @@ -47,13 +48,16 @@ public class KafkaConsumerRecordDefinition extends PluginDefinitionAdapter {
@InjectLoader(ResourcerType.CORE_IMPL)
private List<CargoRequire> requires;

@Inject(Application.COMPONENT_APPLICATION)
private Application application;

public KafkaConsumerRecordDefinition() {
this.matcher = () -> MatcherBuilder.named(TYPE_CONSUMER_RECORD);
this.interceptors = new InterceptorDefinition[]{
new InterceptorDefinitionAdapter(
MatcherBuilder.named(METHOD_VALUE).
and(MatcherBuilder.arguments(0)),
() -> new KafkaConsumerRecordInterceptor(requires))};
() -> new KafkaConsumerRecordInterceptor(requires, application))};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ public class KafkaConsumerRecordInterceptor extends InterceptorAdaptor {

private final CargoRequire require;

public KafkaConsumerRecordInterceptor(List<CargoRequire> requires) {
private final Application application;

public KafkaConsumerRecordInterceptor(List<CargoRequire> requires, Application application) {
this.require = new CargoRequires(requires);
this.application = application;
}

@Override
Expand All @@ -46,9 +49,10 @@ public void onEnter(ExecutableContext ctx) {
private void restoreTag(ConsumerRecord<?, ?> record) {
Headers headers = record.headers();
Header header = headers.lastHeader(Cargo.KEY_TAG_RESTORED_BY);
if (header != null && !Arrays.equals(Application.APP_ID_BYTES, header.value())) {
byte[] names = application.getUniqueThreadName().getBytes(StandardCharsets.UTF_8);
if (!Arrays.equals(names, header == null ? null : header.value())) {
headers.remove(Cargo.KEY_TAG_RESTORED_BY);
headers.add(Cargo.KEY_TAG_RESTORED_BY, Application.APP_ID_BYTES);
headers.add(Cargo.KEY_TAG_RESTORED_BY, names);
RequestContext.create().addCargo(require, headers, Header::key, this::getValue);
}

Expand Down

0 comments on commit 0614ed5

Please sign in to comment.