Skip to content

Commit

Permalink
Added JLS_TWR_FLAG_DROP_ON_OVERFLOW which drops samples on overflow.
Browse files Browse the repository at this point in the history
* Added jls_twr_flags_get/set.
* Bumped version to 0.8.0.
  • Loading branch information
mliberty1 committed Sep 13, 2023
1 parent a08a828 commit 5927ca7
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 18 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
This file contains the list of changes made to the JLS project.


## 0.7.4
## 0.8.0

2023 Sep 13

* Added file truncation repair.
* Added jls_raw_truncate and jls_raw_backend.
* Refactored reader and writer into new core to enable repairer.
* Added jls_twr_flags_get/set.
* Added JLS_TWR_FLAG_DROP_ON_OVERFLOW which drops samples on overflow.


## 0.7.3
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ cmake_minimum_required (VERSION 3.10)
set(PARENT_PROJECT_DIR ${PROJECT_SOURCE_DIR})
set(CMAKE_OSX_ARCHITECTURES "arm64;x86_64" CACHE STRING "" FORCE) # universal2
project(JLS
VERSION 0.7.4
VERSION 0.8.0
LANGUAGES C)
SET(PROJECT_PREFIX JLS)
SET(VERSION_STRING "${PROJECT_VERSION}")
Expand Down
25 changes: 25 additions & 0 deletions include/jls/threaded_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ JLS_CPP_GUARD_START
/// Opaque JLS threaded writer object.
struct jls_twr_s;

/**
* @brief The threaded writer flags
*/
enum jls_twr_flag_e {
JLS_TWR_FLAG_DROP_ON_OVERFLOW = (1 << 0), ///< Drop on overflow when set, block otherwise.
};

/**
* @brief Open a JLS file for writing.
*
Expand All @@ -64,6 +71,24 @@ JLS_API int32_t jls_twr_open(struct jls_twr_s ** instance, const char * path);
*/
JLS_API int32_t jls_twr_close(struct jls_twr_s * self);

/**
* @param Get threaded writer flags.
*
* @param self The JLS writer instance from jls_twr_open().
* @param flags The jls_twr_flag_e bits.
* @return 0 or error code.
*/
JLS_API uint32_t jls_twr_flags_get(struct jls_twr_s * self);

/**
* @param Set threaded writer flags.
*
* @param self The JLS writer instance from jls_twr_open().
* @param flags The jls_twr_flag_e bits.
* @return 0 or error code.
*/
JLS_API int32_t jls_twr_flags_set(struct jls_twr_s * self, uint32_t flags);

/**
* @brief Flush a JLS file to disk.
*
Expand Down
4 changes: 2 additions & 2 deletions include/jls/version.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ JLS_CPP_GUARD_START

// Use version_update.py to update.
#define JLS_VERSION_MAJOR 0
#define JLS_VERSION_MINOR 7
#define JLS_VERSION_PATCH 4
#define JLS_VERSION_MINOR 8
#define JLS_VERSION_PATCH 0

/**
* \brief Macro to encode version to uint32_t.
Expand Down
12 changes: 12 additions & 0 deletions pyjls/binding.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ cdef class Writer:
cdef c_jls.jls_twr_s * _wr
cdef c_jls.jls_signal_def_s _signals[_JLS_SIGNAL_COUNT]

FLAG_DROP_ON_OVERFLOW = c_jls.JLS_TWR_FLAG_DROP_ON_OVERFLOW

def __init__(self, path: str):
cdef c_jls.jls_twr_s ** wr_ptr = &self._wr
cdef int32_t rc
Expand All @@ -293,6 +295,16 @@ cdef class Writer:
def __exit__(self, type, value, traceback):
self.close()

@property
def flags(self):
cdef c_jls.jls_twr_s * wr = self._wr
return c_jls.jls_twr_flags_get(wr)

@flags.setter
def flags(self, value):
cdef c_jls.jls_twr_s * wr = self._wr
c_jls.jls_twr_flags_set(wr, value)

def close(self):
"""Close the JLS file and release all resources."""
cdef c_jls.jls_twr_s * wr = self._wr
Expand Down
4 changes: 4 additions & 0 deletions pyjls/c_jls.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,12 @@ cdef extern from "jls/time.h":

cdef extern from "jls/threaded_writer.h":
struct jls_twr_s
enum jls_twr_flag_e:
JLS_TWR_FLAG_DROP_ON_OVERFLOW = (1 << 0)
int32_t jls_twr_open(jls_twr_s ** instance, const char * path) nogil
int32_t jls_twr_close(jls_twr_s * self) nogil
uint32_t jls_twr_flags_get(jls_twr_s * self)
int32_t jls_twr_flags_set(jls_twr_s * self, uint32_t flags)
int32_t jls_twr_flush(jls_twr_s * self) nogil
int32_t jls_twr_source_def(jls_twr_s * self, const jls_source_def_s * source)
int32_t jls_twr_signal_def(jls_twr_s * self, const jls_signal_def_s * signal)
Expand Down
2 changes: 1 addition & 1 deletion pyjls/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.


__version__ = "0.7.4"
__version__ = "0.8.0"

__title__ = "pyjls"
__description__ = 'Joulescope™ file format'
Expand Down
56 changes: 43 additions & 13 deletions src/threaded_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ struct jls_twr_s {
struct jls_bkt_s * bk; // REQUIRED first entry
struct jls_wr_s * wr;
volatile int quit;
uint32_t flags; // jls_twr_flags_e bits
volatile uint64_t flush_send_id;
volatile uint64_t flush_processed_id;
uint8_t fsr_entry_size_bits[JLS_SIGNAL_COUNT];
Expand Down Expand Up @@ -202,6 +203,7 @@ int32_t jls_twr_open(struct jls_twr_s ** instance, const char * path) {
return JLS_ERROR_NOT_ENOUGH_MEMORY;
}
self->quit = 0;
self->flags = 0;
self->wr = wr;
self->flush_send_id = 0;
self->flush_processed_id = 0;
Expand All @@ -218,21 +220,39 @@ int32_t jls_twr_open(struct jls_twr_s ** instance, const char * path) {
return 0;
}

int32_t msg_send(struct jls_twr_s * self, const struct msg_header_s * hdr, const uint8_t * payload, uint32_t payload_size) {
uint32_t jls_twr_flags_get(struct jls_twr_s * self) {
return self->flags;
}

int32_t jls_twr_flags_set(struct jls_twr_s * self, uint32_t flags) {
self->flags = flags;
return 0;
}

static int32_t msg_send_inner(struct jls_twr_s * self, const struct msg_header_s * hdr, const uint8_t * payload, uint32_t payload_size) {
int32_t rc = 0;
uint32_t sz = sizeof(*hdr) + payload_size;
for (uint32_t count = 0; count < (JLS_BK_MSG_WRITE_TIMEOUT_MS / 5); ++count) {
jls_bkt_msg_lock(self->bk);
uint8_t *msg = jls_mrb_alloc(&self->mrb, sz);
if (msg) {
memcpy(msg, hdr, sizeof(*hdr));
if (payload_size) {
memcpy(msg + sizeof(*hdr), payload, payload_size);
}
jls_bkt_msg_unlock(self->bk);
jls_bkt_msg_signal(self->bk);
jls_bkt_msg_lock(self->bk);
uint8_t *msg = jls_mrb_alloc(&self->mrb, sz);
if (msg) {
memcpy(msg, hdr, sizeof(*hdr));
if (payload_size) {
memcpy(msg + sizeof(*hdr), payload, payload_size);
}
} else {
rc = JLS_ERROR_BUSY;
}
jls_bkt_msg_unlock(self->bk);
return rc;
}

static int32_t msg_send(struct jls_twr_s * self, const struct msg_header_s * hdr, const uint8_t * payload, uint32_t payload_size) {
int64_t t_start = jls_now();
int64_t t_stop = t_start + JLS_TIME_MILLISECOND * (int64_t) JLS_BK_MSG_WRITE_TIMEOUT_MS;
while (jls_now() <= t_stop) {
if (0 == msg_send_inner(self, hdr, payload, payload_size)) {
return 0;
}
jls_bkt_msg_unlock(self->bk);
jls_bkt_sleep_ms(5);
}
return JLS_ERROR_BUSY;
Expand Down Expand Up @@ -321,7 +341,17 @@ int32_t jls_twr_fsr(struct jls_twr_s * self, uint16_t signal_id,
.d = 0
};
uint32_t length = (data_length * self->fsr_entry_size_bits[signal_id] + 7) / 8;
return msg_send(self, &hdr, (const uint8_t *) data, length);
int32_t rc;
if (self->flags & JLS_TWR_FLAG_DROP_ON_OVERFLOW) {
rc = msg_send_inner(self, &hdr, (const uint8_t *) data, length);
} else {
rc = msg_send(self, &hdr, (const uint8_t *) data, length);
}
if (rc) {
JLS_LOGW("signal %" PRIu16 " drop %" PRIu32 " samples @ %" PRIi64,
signal_id, data_length, sample_id);
}
return rc;
}

int32_t jls_twr_fsr_f32(struct jls_twr_s * self, uint16_t signal_id,
Expand Down

0 comments on commit 5927ca7

Please sign in to comment.