Skip to content

Commit

Permalink
Disconnect if no data received in more than 10 minutes
Browse files Browse the repository at this point in the history
  • Loading branch information
eandersson committed Feb 4, 2025
1 parent 548f914 commit ec035a8
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 40 deletions.
10 changes: 5 additions & 5 deletions components/stratum/include/stratum_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,19 @@ void STRATUM_V1_initialize_buffer();

char *STRATUM_V1_receive_jsonrpc_line(int sockfd);

int STRATUM_V1_subscribe(int socket, char * model);
int STRATUM_V1_subscribe(int socket, int send_uid, char * model);

void STRATUM_V1_parse(StratumApiV1Message *message, const char *stratum_json);

void STRATUM_V1_free_mining_notify(mining_notify *params);

int STRATUM_V1_authenticate(int socket, const char *username, const char *pass);
int STRATUM_V1_authenticate(int socket, int send_uid, const char *username, const char *pass);

int STRATUM_V1_configure_version_rolling(int socket, uint32_t * version_mask);
int STRATUM_V1_configure_version_rolling(int socket, int send_uid, uint32_t * version_mask);

int STRATUM_V1_suggest_difficulty(int socket, uint32_t difficulty);
int STRATUM_V1_suggest_difficulty(int socket, int send_uid, uint32_t difficulty);

int STRATUM_V1_submit_share(int socket, const char *username, const char *jobid,
int STRATUM_V1_submit_share(int socket, int send_uid, const char *username, const char *jobid,
const char *extranonce_2, const uint32_t ntime, const uint32_t nonce,
const uint32_t version);

Expand Down
34 changes: 12 additions & 22 deletions components/stratum/stratum_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,9 @@ static const char * TAG = "stratum_api";
static char * json_rpc_buffer = NULL;
static size_t json_rpc_buffer_size = 0;

// A message ID that must be unique per request that expects a response.
// For requests not expecting a response (called notifications), this is null.
static int send_uid = 1;

static void debug_stratum_tx(const char *);
int _parse_stratum_subscribe_result_message(const char * result_json_str, char ** extranonce, int * extranonce2_len);

void STRATUM_V1_reset_uid()
{
ESP_LOGI(TAG, "Resetting stratum uid");

send_uid = 1;
}

void STRATUM_V1_initialize_buffer()
{
json_rpc_buffer = malloc(BUFFER_SIZE);
Expand Down Expand Up @@ -319,31 +308,31 @@ int _parse_stratum_subscribe_result_message(const char * result_json_str, char *
return 0;
}

int STRATUM_V1_subscribe(int socket, char * model)
int STRATUM_V1_subscribe(int socket, int send_uid, char * model)
{
// Subscribe
char subscribe_msg[BUFFER_SIZE];
const esp_app_desc_t *app_desc = esp_app_get_description();
const char *version = app_desc->version;
sprintf(subscribe_msg, "{\"id\": %d, \"method\": \"mining.subscribe\", \"params\": [\"bitaxe/%s/%s\"]}\n", send_uid++, model, version);
sprintf(subscribe_msg, "{\"id\": %d, \"method\": \"mining.subscribe\", \"params\": [\"bitaxe/%s/%s\"]}\n", send_uid, model, version);
debug_stratum_tx(subscribe_msg);

return write(socket, subscribe_msg, strlen(subscribe_msg));
}

int STRATUM_V1_suggest_difficulty(int socket, uint32_t difficulty)
int STRATUM_V1_suggest_difficulty(int socket, int send_uid, uint32_t difficulty)
{
char difficulty_msg[BUFFER_SIZE];
sprintf(difficulty_msg, "{\"id\": %d, \"method\": \"mining.suggest_difficulty\", \"params\": [%ld]}\n", send_uid++, difficulty);
sprintf(difficulty_msg, "{\"id\": %d, \"method\": \"mining.suggest_difficulty\", \"params\": [%ld]}\n", send_uid, difficulty);
debug_stratum_tx(difficulty_msg);

return write(socket, difficulty_msg, strlen(difficulty_msg));
}

int STRATUM_V1_authenticate(int socket, const char * username, const char * pass)
int STRATUM_V1_authenticate(int socket, int send_uid, const char * username, const char * pass)
{
char authorize_msg[BUFFER_SIZE];
sprintf(authorize_msg, "{\"id\": %d, \"method\": \"mining.authorize\", \"params\": [\"%s\", \"%s\"]}\n", send_uid++, username,
sprintf(authorize_msg, "{\"id\": %d, \"method\": \"mining.authorize\", \"params\": [\"%s\", \"%s\"]}\n", send_uid, username,
pass);
debug_stratum_tx(authorize_msg);

Expand All @@ -356,25 +345,26 @@ int STRATUM_V1_authenticate(int socket, const char * username, const char * pass
/// @param ntime The hex-encoded time value use in the block header.
/// @param extranonce_2 The hex-encoded value of extra nonce 2.
/// @param nonce The hex-encoded nonce value to use in the block header.
int STRATUM_V1_submit_share(int socket, const char * username, const char * jobid, const char * extranonce_2, const uint32_t ntime,
const uint32_t nonce, const uint32_t version)
int STRATUM_V1_submit_share(int socket, int send_uid,const char * username, const char * jobid,
const char * extranonce_2, const uint32_t ntime,
const uint32_t nonce, const uint32_t version)
{
char submit_msg[BUFFER_SIZE];
sprintf(submit_msg,
"{\"id\": %d, \"method\": \"mining.submit\", \"params\": [\"%s\", \"%s\", \"%s\", \"%08lx\", \"%08lx\", \"%08lx\"]}\n",
send_uid++, username, jobid, extranonce_2, ntime, nonce, version);
send_uid, username, jobid, extranonce_2, ntime, nonce, version);
debug_stratum_tx(submit_msg);

return write(socket, submit_msg, strlen(submit_msg));
}

int STRATUM_V1_configure_version_rolling(int socket, uint32_t * version_mask)
int STRATUM_V1_configure_version_rolling(int socket, int send_uid, uint32_t * version_mask)
{
char configure_msg[BUFFER_SIZE * 2];
sprintf(configure_msg,
"{\"id\": %d, \"method\": \"mining.configure\", \"params\": [[\"version-rolling\"], {\"version-rolling.mask\": "
"\"ffffffff\"}]}\n",
send_uid++);
send_uid);
debug_stratum_tx(configure_msg);

return write(socket, configure_msg, strlen(configure_msg));
Expand Down
5 changes: 5 additions & 0 deletions main/global_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ typedef struct
bool new_stratum_version_rolling_msg;

int sock;

// A message ID that must be unique per request that expects a response.
// For requests not expecting a response (called notifications), this is null.
int send_uid;

bool ASIC_initalized;
bool psram_is_available;
} GlobalState;
Expand Down
1 change: 1 addition & 0 deletions main/tasks/asic_result_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ void ASIC_result_task(void *pvParameters)
char * user = GLOBAL_STATE->SYSTEM_MODULE.is_using_fallback ? GLOBAL_STATE->SYSTEM_MODULE.fallback_pool_user : GLOBAL_STATE->SYSTEM_MODULE.pool_user;
int ret = STRATUM_V1_submit_share(
GLOBAL_STATE->sock,
GLOBAL_STATE->send_uid++,
user,
GLOBAL_STATE->ASIC_TASK_MODULE.active_jobs[job_id]->jobid,
GLOBAL_STATE->ASIC_TASK_MODULE.active_jobs[job_id]->extranonce2,
Expand Down
72 changes: 59 additions & 13 deletions main/tasks/stratum_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#define MAX_RETRY_ATTEMPTS 3
#define MAX_CRITICAL_RETRY_ATTEMPTS 5

#define BUFFER_SIZE 1024

static const char * TAG = "stratum_task";

static StratumApiV1Message stratum_api_v1_message = {};
Expand All @@ -34,6 +36,16 @@ static SystemTaskModule SYSTEM_TASK_MODULE = {.stratum_difficulty = 8192};
static const char * primary_stratum_url;
static uint16_t primary_stratum_port;

struct timeval tcp_snd_timeout = {
.tv_sec = 5,
.tv_usec = 0
};

struct timeval tcp_rcv_timeout = {
.tv_sec = 60 * 10,
.tv_usec = 0
};

bool is_wifi_connected() {
wifi_ap_record_t ap_info;
if (esp_wifi_sta_get_ap_info(&ap_info) == ESP_OK) {
Expand All @@ -56,6 +68,13 @@ void cleanQueue(GlobalState * GLOBAL_STATE) {
pthread_mutex_unlock(&GLOBAL_STATE->valid_jobs_lock);
}

void stratum_reset_uid(GlobalState * GLOBAL_STATE)
{
ESP_LOGI(TAG, "Resetting stratum uid");
GLOBAL_STATE->send_uid = 1;
}


void stratum_close_connection(GlobalState * GLOBAL_STATE)
{
if (GLOBAL_STATE->sock < 0) {
Expand All @@ -80,6 +99,11 @@ void stratum_primary_heartbeat(void * pvParameters)
int addr_family = AF_INET;
int ip_protocol = IPPROTO_IP;

struct timeval tcp_timeout = {
.tv_sec = 5,
.tv_usec = 0
};

while (1)
{
if (GLOBAL_STATE->SYSTEM_MODULE.is_using_fallback == false) {
Expand Down Expand Up @@ -125,16 +149,34 @@ void stratum_primary_heartbeat(void * pvParameters)
vTaskDelay(60000 / portTICK_PERIOD_MS);
continue;
}

if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO , &tcp_timeout, sizeof(tcp_timeout)) != 0) {
ESP_LOGE(TAG, "Fail to setsockopt SO_RCVTIMEO ");
}

int send_uid = 1;
STRATUM_V1_subscribe(sock, send_uid++, GLOBAL_STATE->asic_model_str);
STRATUM_V1_authenticate(sock, send_uid++, GLOBAL_STATE->SYSTEM_MODULE.pool_user, GLOBAL_STATE->SYSTEM_MODULE.pool_pass);

char recv_buffer[BUFFER_SIZE];
memset(recv_buffer, 0, BUFFER_SIZE);
int bytes_received = recv(sock, recv_buffer, BUFFER_SIZE - 1, 0);

shutdown(sock, SHUT_RDWR);
close(sock);

if (GLOBAL_STATE->SYSTEM_MODULE.is_using_fallback) {
if (bytes_received == -1) {
vTaskDelay(60000 / portTICK_PERIOD_MS);
continue;
}

if (strstr(recv_buffer, "mining.notify") != NULL) {
ESP_LOGI(TAG, "Heartbeat successful and in fallback mode. Switching back to primary.");
GLOBAL_STATE->SYSTEM_MODULE.is_using_fallback = false;
stratum_close_connection(GLOBAL_STATE);
vTaskDelay(60000 / portTICK_PERIOD_MS);
continue;
}

vTaskDelay(60000 / portTICK_PERIOD_MS);
}
}
Expand All @@ -154,11 +196,8 @@ void stratum_task(void * pvParameters)
int ip_protocol = IPPROTO_IP;
int retry_attempts = 0;
int retry_critical_attempts = 0;
struct timeval timeout = {};
timeout.tv_sec = 5;
timeout.tv_usec = 0;

xTaskCreate(stratum_primary_heartbeat, "stratum primary heartbeat", 4096, pvParameters, 1, NULL);
xTaskCreate(stratum_primary_heartbeat, "stratum primary heartbeat", 8192, pvParameters, 1, NULL);

ESP_LOGI(TAG, "Trying to get IP for URL: %s", stratum_url);
while (1) {
Expand Down Expand Up @@ -226,30 +265,33 @@ void stratum_task(void * pvParameters)
vTaskDelay(5000 / portTICK_PERIOD_MS);
continue;
}
retry_attempts = 0;

if (setsockopt(GLOBAL_STATE->sock, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout)) != 0) {
if (setsockopt(GLOBAL_STATE->sock, SOL_SOCKET, SO_SNDTIMEO, &tcp_snd_timeout, sizeof(tcp_snd_timeout)) != 0) {
ESP_LOGE(TAG, "Fail to setsockopt SO_SNDTIMEO");
}

STRATUM_V1_reset_uid();
if (setsockopt(GLOBAL_STATE->sock, SOL_SOCKET, SO_RCVTIMEO , &tcp_rcv_timeout, sizeof(tcp_rcv_timeout)) != 0) {
ESP_LOGE(TAG, "Fail to setsockopt SO_RCVTIMEO ");
}

stratum_reset_uid(GLOBAL_STATE);
cleanQueue(GLOBAL_STATE);

///// Start Stratum Action
// mining.configure - ID: 1
STRATUM_V1_configure_version_rolling(GLOBAL_STATE->sock, &GLOBAL_STATE->version_mask);
STRATUM_V1_configure_version_rolling(GLOBAL_STATE->sock, GLOBAL_STATE->send_uid++, &GLOBAL_STATE->version_mask);

// mining.subscribe - ID: 2
STRATUM_V1_subscribe(GLOBAL_STATE->sock, GLOBAL_STATE->asic_model_str);
STRATUM_V1_subscribe(GLOBAL_STATE->sock, GLOBAL_STATE->send_uid++, GLOBAL_STATE->asic_model_str);

char * username = GLOBAL_STATE->SYSTEM_MODULE.is_using_fallback ? GLOBAL_STATE->SYSTEM_MODULE.fallback_pool_user : GLOBAL_STATE->SYSTEM_MODULE.pool_user;
char * password = GLOBAL_STATE->SYSTEM_MODULE.is_using_fallback ? GLOBAL_STATE->SYSTEM_MODULE.fallback_pool_pass : GLOBAL_STATE->SYSTEM_MODULE.pool_pass;

//mining.authorize - ID: 3
STRATUM_V1_authenticate(GLOBAL_STATE->sock, username, password);
STRATUM_V1_authenticate(GLOBAL_STATE->sock, GLOBAL_STATE->send_uid++, username, password);

//mining.suggest_difficulty - ID: 4
STRATUM_V1_suggest_difficulty(GLOBAL_STATE->sock, STRATUM_DIFFICULTY);
STRATUM_V1_suggest_difficulty(GLOBAL_STATE->sock, GLOBAL_STATE->send_uid++, STRATUM_DIFFICULTY);

// Everything is set up, lets make sure we don't abandon work unnecessarily.
GLOBAL_STATE->abandon_work = 0;
Expand All @@ -258,9 +300,11 @@ void stratum_task(void * pvParameters)
char * line = STRATUM_V1_receive_jsonrpc_line(GLOBAL_STATE->sock);
if (!line) {
ESP_LOGE(TAG, "Failed to receive JSON-RPC line, reconnecting...");
retry_attempts++;
stratum_close_connection(GLOBAL_STATE);
break;
}

ESP_LOGI(TAG, "rx: %s", line); // debug incoming stratum messages
STRATUM_V1_parse(&stratum_api_v1_message, line);
free(line);
Expand Down Expand Up @@ -304,6 +348,8 @@ void stratum_task(void * pvParameters)
SYSTEM_notify_rejected_share(GLOBAL_STATE);
}
} else if (stratum_api_v1_message.method == STRATUM_RESULT_SETUP) {
// Reset retry attempts after successfully receiving data.
retry_attempts = 0;
if (stratum_api_v1_message.response_success) {
ESP_LOGI(TAG, "setup message accepted");
} else {
Expand Down

0 comments on commit ec035a8

Please sign in to comment.