Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Tests for Storage Extension and File Rotation #1427

Merged
merged 11 commits into from
Oct 13, 2023
Original file line number Diff line number Diff line change
@@ -1,44 +1,52 @@
package software.amazon.adot.testbed;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.file.Path;
import java.nio.file.Files;
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.Instant;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.MountableFile;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
import software.amazon.awssdk.services.cloudwatchlogs.model.GetLogEventsRequest;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileWriter;
import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
import java.util.UUID;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;

@Testcontainers(disabledWithoutDocker = true)
Expand All @@ -49,10 +57,10 @@ class LogsTests {
: "public.ecr.aws/aws-observability/aws-otel-collector:latest";
private final Logger collectorLogger = LoggerFactory.getLogger("collector");
private static final String uniqueID = UUID.randomUUID().toString();

private Path logDirectory;
private GenericContainer<?> collector;
Comment on lines +60 to 61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does test parallelism work with the framework we are using? Does having these in the Class scope have a potential to cause conflicts if test cases are ran in parallel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried running with ./gradlew test --rerun-tasks --info --parallel to have optimal number of threads to use no conlicts were seen. We are also using specific path/file within the directory so parallel wouldn't be a problem IMO


private GenericContainer<?> createAndStartCollector(String configFilePath, String logFilePath, String logStreamName) throws IOException {
private GenericContainer<?> createAndStartCollector(String configFilePath, String logStreamName) throws IOException {

// Create an environment variable map
Map<String, String> envVariables = new HashMap<>();
Expand All @@ -65,61 +73,164 @@ private GenericContainer<?> createAndStartCollector(String configFilePath, Strin
if (System.getenv("AWS_SESSION_TOKEN") != null) {
envVariables.put("AWS_SESSION_TOKEN", System.getenv("AWS_SESSION_TOKEN"));
}

try {
logDirectory = Files.createTempDirectory("tempLogs");
} catch (IOException e) {
throw new RuntimeException("Failed to create log directory", e);
}
var collector = new GenericContainer<>(TEST_IMAGE)
.withCopyFileToContainer(MountableFile.forClasspathResource(configFilePath), "/etc/collector/config.yaml")
.withLogConsumer(new Slf4jLogConsumer(collectorLogger))
.waitingFor(Wait.forLogMessage(".*Everything is ready. Begin running and processing data.*", 1))
.withCommand("--config", "/etc/collector/config.yaml", "--feature-gates=+adot.receiver.filelog,+adot.exporter.awscloudwatchlogs,+adot.extension.file_storage")
.withEnv(envVariables);
.withEnv(envVariables)
.withClasspathResourceMapping("/logs", "/logs", BindMode.READ_WRITE)
.withCommand("--config", "/etc/collector/config.yaml", "--feature-gates=+adot.filelog.receiver,+adot.awscloudwatchlogs.exporter,+adot.file_storage.extension");

//Mount the log file for the file log receiver to parse
collector.withCopyFileToContainer(MountableFile.forClasspathResource(logFilePath), logFilePath );
//Mount the Temp directory
collector.withFileSystemBind(logDirectory.toString(),"/tempLogs", BindMode.READ_WRITE);

collector.start();
collector.waitingFor(Wait.forHealthcheck());
return collector;
}

@Test
void testSyslog() throws Exception {
String logStreamName = "rfcsyslog-logstream-" + uniqueID;
collector = createAndStartCollector("/configurations/config-rfcsyslog.yaml", "/logs/RFC5424.log", logStreamName);
collector = createAndStartCollector("/configurations/config-rfcsyslog.yaml", logStreamName);

validateLogs(logStreamName , "/logs/RFC5424.log");
List<String> logFilePaths = new ArrayList<>();
logFilePaths.add("/logs/RFC5424.log");
validateLogs(logStreamName , logFilePaths);
collector.stop();
}

@Test
void testLog4j() throws Exception {
String logStreamName = "log4j-logstream-" + uniqueID;
collector = createAndStartCollector("/configurations/config-log4j.yaml", "/logs/log4j.log", logStreamName);
collector = createAndStartCollector("/configurations/config-log4j.yaml", logStreamName);

validateLogs(logStreamName , "/logs/log4j.log");
List<String> logFilePaths = new ArrayList<>();
logFilePaths.add("/logs/log4j.log");
validateLogs(logStreamName , logFilePaths);
collector.stop();
}

@Test
void testJson() throws Exception {
String logStreamName = "json-logstream-" + uniqueID;
collector = createAndStartCollector("/configurations/config-json.yaml", "/logs/testingJSON.log", logStreamName);
collector = createAndStartCollector("/configurations/config-json.yaml", logStreamName);

List<String> logFilePaths = new ArrayList<>();
logFilePaths.add("/logs/testingJSON.log");
validateLogs(logStreamName , logFilePaths);
collector.stop();
}

@Test
void testCollectorRestartStorageExtension() throws Exception {
String logStreamName = "storageExtension-logstream-" + uniqueID;
collector = createAndStartCollector("/configurations/config-storageExtension.yaml", logStreamName);
File tempFile = new File(logDirectory.toString(), "storageExtension.log");
Thread.sleep(5000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why sleep here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sleep is similar as here, its just to wait for the collector to start watching the file.


FileWriter fileWriter = new FileWriter(tempFile);
vasireddy99 marked this conversation as resolved.
Show resolved Hide resolved
try {
fileWriter.write("First Message, collector is running" + "\n");
fileWriter.write("Second Message, collector is running" + "\n");
fileWriter.write("Third Message, collector is running" + "\n");
} catch (IOException e) {
vasireddy99 marked this conversation as resolved.
Show resolved Hide resolved
throw new RuntimeException("Error writing to File A.", e);
}
fileWriter.flush();

List<String> logFilePaths = new ArrayList<>();
String expectedLogPath = logDirectory.toString();
logFilePaths.add(expectedLogPath + "/storageExtension.log");

validateLogs(logStreamName , logFilePaths);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to validate logs twice? Could we run through the start -> write -> stop -> write -> start -> stop cycle then validate once?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validate is behaving like a sleep here. we need to make sure that logs were read before stopping the collector.

Copy link
Contributor Author

@vasireddy99 vasireddy99 Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to make sure that logs were read before stopping the collector.
I tend to agree with that. After changes now it is doing - start -> write -> validate->stop -> write -> start -> stop and then it validates again.


validateLogs(logStreamName , "/logs/testingJSON.log");
collector.stop();

// write to the file when collector is stopped
try {
fileWriter.write("First Message after collector is stopped" + "\n");
fileWriter.write("Second Message after the collector is stopped" + "\n");
fileWriter.write("Third Message after the collector is stopped" + "\n");
fileWriter.flush();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is flush called inside the try block here but outside of it on line 144?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually no specific reason, modified .

} catch (IOException e) {
throw new RuntimeException("Error writing to File A.", e);
}
fileWriter.close();
//restarting the collector
collector.start();

validateLogs(logStreamName , logFilePaths);
collector.stop();
}

void validateLogs(String testLogStreamName, String logFilePath) throws Exception {
var file = new File(logFilePath);
@Test
void testFileRotation() throws Exception {
String logStreamName = "fileRotation-logstream-" + uniqueID;
collector = createAndStartCollector("/configurations/config-fileRotation.yaml", logStreamName);

Thread.sleep(5000);
rapphil marked this conversation as resolved.
Show resolved Hide resolved

// Create and write data to File A
File tempFile = new File(logDirectory.toString(), "testlogA.log");
try (FileWriter fileWriter = new FileWriter(tempFile)) {
fileWriter.write("Message in File A" + "\n");
fileWriter.flush();
fileWriter.close();
} catch (IOException e) {
throw new RuntimeException("Error writing to File A.", e);
}
Thread.sleep(5000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you van validate the log entries so far here so that you don't need to add a sleep. that will block in a safe way before you move with the rest of the tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that validation can be done, but the thread here is making sure to give enough time for the file operations to be done. So even if we do the validation without the thread, since the log file is not generated it would lead FileNotFoundException.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is writing to the file a blocking operation? Why do you need to wait? What file operations are you referring to?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what operations needs to be done?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File operations i meant here are to create and write data to File A etc. it takes considerable time for creating and writing to file.
So without the thread.sleep the collector is not able to parse log line, since it seems the file testlogA.log didn't have the line written to it yet.

        java.lang.AssertionError: 
        Expecting actual:
          ["Message in renamed file - line 1",
            "Message in renamed file - line 2",
            "Message in renamed file - line 3"]
        to contain exactly in any order:
          ["Message in File A",
            "Message in renamed file - line 1",
            "Message in renamed file - line 2",
            "Message in renamed file - line 3"]
        but could not find the following elements:
          ["Message in File A"]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is why I suggested that you add a validateLogs instead of a Thread.sleep to validate only "Message in File A". It will block until the lines are sent to cwlogs. This will guarantee that it is safe to continue with the tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Committed the changes.


File renameFile = new File(logDirectory.toString(), "testlogA-1234.log");
tempFile.renameTo(renameFile);

File tempFileB = new File(logDirectory.toString(), "testlogA.log");
try (FileWriter newfileWriter = new FileWriter(tempFileB)) {
newfileWriter.write("Message in renamed file - line 1" + "\n");
newfileWriter.write("Message in renamed file - line 2" + "\n");
newfileWriter.write("Message in renamed file - line 3" + "\n");
newfileWriter.flush();
newfileWriter.close();
} catch (IOException e) {
throw new RuntimeException("Error writing to File B.", e);
}

List<String> logFilePaths = new ArrayList<>();
String expectedLogPath = logDirectory.toString();
logFilePaths.add(expectedLogPath + "/testlogA-1234.log");
logFilePaths.add(expectedLogPath + "/testlogA.log");

validateLogs(logStreamName, logFilePaths);

collector.stop();
}


void validateLogs(String testLogStreamName, List<String> logFilePaths) throws Exception {
var lines = new HashSet<String>();

try (InputStream inputStream = getClass().getResourceAsStream(logFilePath);
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
String line;
while ((line = reader.readLine()) != null) {
lines.add(line);
for (String logFilePath : logFilePaths) {
InputStream inputStream;
//Check whether the filePath is from resource folder.
if (getClass().getResource(logFilePath) != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is still the boolean flag behaviour disguised.....

my suggestion was to change the function signature to use

void validateLogs(String testLogStreamName, List<InputStream> inputStreams) throws Exception {

Another option is to create two functions, one that should only be used for resources paths and another that is used only for local paths.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, now I see the suggestion. Modified.

inputStream = getClass().getResourceAsStream(logFilePath);
} else {
inputStream = new FileInputStream(logFilePath);
}

try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
String line;
while ((line = reader.readLine()) != null) {
lines.add(line);
}
} catch (IOException e) {
throw new RuntimeException("Error reading from the file: " + logFilePath, e);
}
} catch (IOException e) {
throw new RuntimeException("Error reading from the file: " + logFilePath, e);
}

var cwClient = CloudWatchLogsClient.builder()
Expand Down Expand Up @@ -160,6 +271,13 @@ void validateLogs(String testLogStreamName, String logFilePath) throws Exception
.filter(Objects::nonNull)
.collect(Collectors.toSet());

//Print
System.out.println("Actual Logs - Log lines From Cloudwatch");
messageToValidate.forEach(System.out::println);

System.out.println("Expected logs- Log lines from log file");
lines.forEach(System.out::println);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's be more thoughtful about what we are printing to std out. We should only need to print if there is a failure. And in the case of the failure it should be descriptive to tell us what was expected and what was actually given.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I have added it for easy lookout on whats' validated. if we should only need to print during failure then the assertions will log the actual/expected when in failure, removed the print statements


//Validate body field in JSON-messageToValidate with actual log line from the log file.
assertThat(messageToValidate.containsAll(lines)).isTrue();
assertThat(messageToValidate).containsExactlyInAnyOrderElementsOf(lines);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
receivers:
filelog:
include: [/tempLogs/testlogA.log]

exporters:
awscloudwatchlogs:
log_group_name: "adot-testbed/logs-component-testing/logs"
log_stream_name: ${LOG_STREAM_NAME}
log_retention: 7

service:
pipelines:
logs:
receivers: [filelog]
exporters: [awscloudwatchlogs]
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
extensions:
file_storage:
directory: /logs/

receivers:
filelog:
include: [/tempLogs/storageExtension.log]
storage: file_storage

exporters:
awscloudwatchlogs:
log_group_name: "adot-testbed/logs-component-testing/logs"
log_stream_name: ${LOG_STREAM_NAME}
log_retention: 7

service:
extensions: [ file_storage ]
pipelines:
logs:
receivers: [filelog]
exporters: [awscloudwatchlogs]
Loading