Skip to content

Commit

Permalink
Update reader code to make it compatible with older traces
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Wang <[email protected]>
  • Loading branch information
wangvsa committed Aug 2, 2024
1 parent 7b9c6ec commit 5d01663
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 32 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ include_directories(${PROJECT_BINARY_DIR})
#------------------------------------------------------------------------------
set(RECORDER_VERSION_MAJOR "2")
set(RECORDER_VERSION_MINOR "5")
set(RECORDER_VERSION_PATCH "0")
set(RECORDER_VERSION_PATCH "1")
set(RECORDER_PACKAGE "recorder")
set(RECORDER_PACKAGE_NAME "RECORDER")
set(RECORDER_PACKAGE_VERSION "${RECORDER_VERSION_MAJOR}.${RECORDER_VERSION_MINOR}.${RECORDER_VERSION_PATCH}")
Expand Down
2 changes: 1 addition & 1 deletion include/recorder-logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
*/
#define RECORDER_VERSION_MAJOR 2
#define RECORDER_VERSION_MINOR 5
#define RECORDER_VERSION_PATCH 0
#define RECORDER_VERSION_PATCH 1

#define RECORDER_POSIX 0
#define RECORDER_MPIIO 1
Expand Down
48 changes: 48 additions & 0 deletions tools/reader-cst-cfg.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,54 @@ void reader_free_cfg(CFG* cfg) {
}
}


void reader_decode_cst_2_3(RecorderReader *reader, int rank, CST *cst) {
cst->rank = rank;
char cst_filename[1096] = {0};
sprintf(cst_filename, "%s/%d.cst", reader->logs_dir, rank);

FILE* f = fopen(cst_filename, "rb");

int key_len;
fread(&cst->entries, sizeof(int), 1, f);

cst->cs_list = malloc(cst->entries * sizeof(CallSignature));

for(int i = 0; i < cst->entries; i++) {
fread(&cst->cs_list[i].terminal_id, sizeof(int), 1, f);
fread(&cst->cs_list[i].key_len, sizeof(int), 1, f);

cst->cs_list[i].key = malloc(cst->cs_list[i].key_len);
fread(cst->cs_list[i].key, 1, cst->cs_list[i].key_len, f);

assert(cst->cs_list[i].terminal_id < cst->entries);
}
fclose(f);
}

void reader_decode_cfg_2_3(RecorderReader *reader, int rank, CFG* cfg) {
cfg->rank = rank;
char cfg_filename[1096] = {0};
sprintf(cfg_filename, "%s/%d.cfg", reader->logs_dir, rank);

FILE* f = fopen(cfg_filename, "rb");

fread(&cfg->rules, sizeof(int), 1, f);

cfg->cfg_head = NULL;
for(int i = 0; i < cfg->rules; i++) {
RuleHash *rule = malloc(sizeof(RuleHash));

fread(&(rule->rule_id), sizeof(int), 1, f);
fread(&(rule->symbols), sizeof(int), 1, f);

rule->rule_body = (int*) malloc(sizeof(int)*rule->symbols*2);
fread(rule->rule_body, sizeof(int), rule->symbols*2, f);
HASH_ADD_INT(cfg->cfg_head, rule_id, rule);
}
fclose(f);
}

void reader_decode_cst(int rank, void* buf, CST* cst) {
cst->rank = rank;

Expand Down
2 changes: 2 additions & 0 deletions tools/reader-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ typedef struct IntervalsMap_t {
* recorder_get_cst_cfg() can be used to perform
* custom tasks with CST and CFG
*/
void reader_decode_cst_2_3(RecorderReader *reader, int rank, CST *cst);
void reader_decode_cfg_2_3(RecorderReader *reader, int rank, CFG *cfg);
void reader_decode_cst(int rank, void* buf, CST* cst);
void reader_decode_cfg(int rank, void* buf, CFG* cfg);
void reader_free_cst(CST *cst);
Expand Down
123 changes: 94 additions & 29 deletions tools/reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,21 @@ void* read_zlib(FILE* source) {
return decompressed;
}

void check_version(RecorderReader* reader) {
void check_version(RecorderReader* reader, int* v_major, int* v_minor) {
char version_file[1096] = {0};
sprintf(version_file, "%s/VERSION", reader->logs_dir);

FILE* fp = fopen(version_file, "r");
assert(fp != NULL);
int major, minor, patch;
fscanf(fp, "%d.%d.%d", &major, &minor, &patch);
if(major != RECORDER_VERSION_MAJOR || minor != RECORDER_VERSION_MINOR) {
fprintf(stderr, "incompatible version: file=%d.%d.%d != reader=%d.%d.%d\n",
*v_major = major;
*v_minor = minor;

double v1 = major + minor/10.0;
double v2 = RECORDER_VERSION_MAJOR + RECORDER_VERSION_MINOR/10.0;
if (v1 > v2) {
fprintf(stderr, "incompatible version: trace=%d.%d.%d > reader=%d.%d.%d\n",
major, minor, patch, RECORDER_VERSION_MAJOR,
RECORDER_VERSION_MINOR, RECORDER_VERSION_PATCH);
exit(1);
Expand All @@ -88,7 +93,34 @@ void read_metadata(RecorderReader* reader) {

FILE* fp = fopen(metadata_file, "rb");
assert(fp != NULL);
fread(&reader->metadata, sizeof(reader->metadata), 1, fp);
if (reader->trace_version_major == 2 && reader->trace_version_minor == 3) {
struct RecorderMetadata_2_3 {
int total_ranks;
double start_ts;
double time_resolution;
int ts_buffer_elements;
int ts_compression_algo; // timestamp compression algorithm
};
struct RecorderMetadata_2_3 metadata_2_3;
fread(&metadata_2_3, sizeof(metadata_2_3), 1, fp);
reader->metadata.total_ranks = metadata_2_3.total_ranks;
reader->metadata.posix_tracing = 1;
reader->metadata.mpi_tracing = 1;
reader->metadata.mpiio_tracing = 1;
reader->metadata.hdf5_tracing = 1;
reader->metadata.store_tid = 1;
reader->metadata.store_call_depth = 1;
reader->metadata.start_ts = metadata_2_3.start_ts;
reader->metadata.time_resolution = metadata_2_3.time_resolution;
reader->metadata.ts_buffer_elements= metadata_2_3.ts_buffer_elements;
reader->metadata.interprocess_compression = 0;
reader->metadata.interprocess_pattern_recognition = 0;
reader->metadata.intraprocess_pattern_recognition = 0;
reader->metadata.ts_compression = 0;
} else {
fread(&reader->metadata, sizeof(reader->metadata), 1, fp);
}


long pos = ftell(fp);
fseek(fp, 0, SEEK_END);
Expand Down Expand Up @@ -132,7 +164,7 @@ void recorder_init_reader(const char* logs_dir, RecorderReader *reader) {
reader->hdf5_start_idx = -1;
reader->prev_tstart = 0.0;

check_version(reader);
check_version(reader, &reader->trace_version_major, &reader->trace_version_minor);

read_metadata(reader);

Expand Down Expand Up @@ -184,23 +216,33 @@ void recorder_init_reader(const char* logs_dir, RecorderReader *reader) {

} else {
for(int rank = 0; rank < nprocs; rank++) {
char cst_fname[1096] = {0};
sprintf(cst_fname, "%s/%d.cst", reader->logs_dir, rank);
FILE* cst_file = fopen(cst_fname, "rb");
void* buf_cst = read_zlib(cst_file);
reader->csts[rank] = (CST*) malloc(sizeof(CST));
reader_decode_cst(rank, buf_cst, reader->csts[rank]);
free(buf_cst);
fclose(cst_file);

char cfg_fname[1096] = {0};
sprintf(cfg_fname, "%s/%d.cfg", reader->logs_dir, rank);
FILE* cfg_file = fopen(cfg_fname, "rb");
void* buf_cfg = read_zlib(cfg_file);
reader->cfgs[rank] = (CFG*) malloc(sizeof(CFG));
reader_decode_cfg(rank, buf_cfg, reader->cfgs[rank]);
free(buf_cfg);
fclose(cfg_file);
if (reader->trace_version_major == 2 && reader->trace_version_minor == 3) {
reader->csts[rank] = (CST*) malloc(sizeof(CST));
reader_decode_cst_2_3(reader, rank, reader->csts[rank]);
} else {
char cst_fname[1096] = {0};
sprintf(cst_fname, "%s/%d.cst", reader->logs_dir, rank);
FILE* cst_file = fopen(cst_fname, "rb");
void* buf_cst = read_zlib(cst_file);
reader->csts[rank] = (CST*) malloc(sizeof(CST));
reader_decode_cst(rank, buf_cst, reader->csts[rank]);
free(buf_cst);
fclose(cst_file);
}

if (reader->trace_version_major == 2 && reader->trace_version_minor == 3) {
reader->cfgs[rank] = (CFG*) malloc(sizeof(CFG));
reader_decode_cfg_2_3(reader, rank, reader->cfgs[rank]);
} else {
char cfg_fname[1096] = {0};
sprintf(cfg_fname, "%s/%d.cfg", reader->logs_dir, rank);
FILE* cfg_file = fopen(cfg_fname, "rb");
void* buf_cfg = read_zlib(cfg_file);
reader->cfgs[rank] = (CFG*) malloc(sizeof(CFG));
reader_decode_cfg(rank, buf_cfg, reader->cfgs[rank]);
free(buf_cfg);
fclose(cfg_file);
}
}
}
}
Expand Down Expand Up @@ -293,16 +335,27 @@ void rule_application(RecorderReader* reader, CFG* cfg, CST* cst, int rule_id, u
}
}

void decode_records_core(RecorderReader *reader, int rank,
void (*user_op)(Record*, void*), void* user_arg, bool free_record) {
// caller must free the timestamp
// buffer after use
uint32_t* read_timestamp_file(RecorderReader* reader, int rank) {
char ts_fname[1096] = {0};

int nprocs = reader->metadata.total_ranks;
CST* cst = reader_get_cst(reader, rank);
CFG* cfg = reader_get_cfg(reader, rank);
if (reader->trace_version_major==2 && reader->trace_version_minor==3) {
sprintf(ts_fname, "%s/%d.ts", reader->logs_dir, rank);
FILE* ts_file = fopen(ts_fname, "rb");
fseek(ts_file, 0, SEEK_END);
long filesize = ftell(ts_file);
fseek(ts_file, 0, SEEK_CUR);

reader->prev_tstart = 0.0;
uint32_t* ts_buf = (uint32_t*) malloc(filesize);
fread(ts_buf, 1, filesize, ts_file);
fclose(ts_file);

return ts_buf;
}

int nprocs = reader->metadata.total_ranks;

char ts_fname[1096] = {0};
sprintf(ts_fname, "%s/recorder.ts", reader->logs_dir);
FILE* ts_file = fopen(ts_fname, "rb");

Expand All @@ -329,6 +382,18 @@ void decode_records_core(RecorderReader *reader, int rank,
fread(ts_buf, 1, buf_sizes[rank], ts_file);
}
fclose(ts_file);
}


void decode_records_core(RecorderReader *reader, int rank,
void (*user_op)(Record*, void*), void* user_arg, bool free_record) {

CST* cst = reader_get_cst(reader, rank);
CFG* cfg = reader_get_cfg(reader, rank);

reader->prev_tstart = 0.0;

uint32_t* ts_buf = read_timestamp_file(reader, rank);

rule_application(reader, cfg, cst, -1, ts_buf, user_op, user_arg, free_record);

Expand Down
3 changes: 3 additions & 0 deletions tools/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ typedef struct RecorderReader_t {
// and cfgs[rank].
CST** csts;
CFG** cfgs;

int trace_version_major;
int trace_version_minor;
} RecorderReader;


Expand Down
3 changes: 3 additions & 0 deletions tools/recorder-summary.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,12 @@ int main(int argc, char **argv) {

RecorderReader reader;
recorder_init_reader(argv[optind], &reader);

CST* cst = reader_get_cst(&reader, 0);
print_metadata(&reader);
print_statistics(&reader, cst);
printf("here2\n");
fflush(stdout);

if (show_cst) {
print_cst(&reader, cst);
Expand Down
1 change: 0 additions & 1 deletion tools/recorder2text.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ int main(int argc, char **argv) {
int end_rank = min(reader.metadata.total_ranks, n*(mpi_rank+1));

for(int rank = start_rank; rank < end_rank; rank++) {

sprintf(textfile_path, formatting_fname, textfile_dir, rank);
FILE* fout = fopen(textfile_path, "w");

Expand Down

0 comments on commit 5d01663

Please sign in to comment.