From f3e5c1110bc98daf7dc8e18bbb42babdb95bbc74 Mon Sep 17 00:00:00 2001 From: Oscar Laird Date: Sat, 24 Feb 2024 17:49:52 -0500 Subject: [PATCH] vacuuming --- src/pinecone.c | 30 ++++++++++++++++--- src/pinecone.h | 3 +- src/pinecone_api.c | 75 ++++++++++++++++++++++++++++++++++++---------- src/pinecone_api.h | 5 +++- 4 files changed, 91 insertions(+), 22 deletions(-) diff --git a/src/pinecone.c b/src/pinecone.c index bb4f48d1..381f9850 100644 --- a/src/pinecone.c +++ b/src/pinecone.c @@ -63,7 +63,7 @@ PineconeInit(void) "", NULL, // TODO you can pass a function pointer to a validation function here, instead of doing it manually in amoptions AccessExclusiveLock); - add_int_reloption(pinecone_relopt_kind, "buffer_threshold", // todo: this shouldn't be a reloption because then it can't be changed without a rebuild + add_int_reloption(pinecone_relopt_kind, "buffer_threshold", // todo: this shouldn't be a reloption because then it can't be changed without a rebuild of the index "Buffer Threshold value", PINECONE_DEFAULT_BUFFER_THRESHOLD, PINECONE_MIN_BUFFER_THRESHOLD, @@ -493,10 +493,32 @@ void InsertBufferTuple(Relation index, Datum *values, bool *isnull, ItemPointer UnlockReleaseBuffer(buf); } +ItemPointerData id_get_heap_tid(char *id) +{ + ItemPointerData heap_tid; + sscanf(id, "%02hx%02hx%02hx", &heap_tid.ip_blkid.bi_hi, &heap_tid.ip_blkid.bi_lo, &heap_tid.ip_posid); + return heap_tid; +} -IndexBulkDeleteResult *no_bulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, - IndexBulkDeleteCallback callback, void *callback_state) { return NULL; } +IndexBulkDeleteResult *pinecone_bulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, + IndexBulkDeleteCallback callback, void *callback_state) +{ + char* host = ReadMetaPage(info->index).host; + cJSON* ids_to_delete = cJSON_CreateArray(); + // yikes! pinecone makes you read your ids sequentially in pages of 100, so vacuuming a large index is going to be impossible + cJSON* json_vectors = pinecone_list_vectors(pinecone_api_key, ReadMetaPage(info->index).host, 100, NULL); + cJSON* json_vector; // todo: do I need to be freeing all my cJSON objects? + elog(DEBUG1, "vacuuming json_vectors: %s", cJSON_Print(json_vectors)); + cJSON_ArrayForEach(json_vector, json_vectors) { + char* id = cJSON_GetStringValue(cJSON_GetObjectItem(json_vector, "id")); + ItemPointerData heap_tid = id_get_heap_tid(id); + if (callback(&heap_tid, callback_state)) cJSON_AddItemToArray(ids_to_delete, cJSON_CreateString(id)); + } + elog(DEBUG1, "deleting ids: %s", cJSON_Print(ids_to_delete)); + pinecone_delete_vectors(pinecone_api_key, host, ids_to_delete); + return NULL; +} IndexBulkDeleteResult *no_vacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats) { return NULL; } @@ -830,7 +852,7 @@ Datum pineconehandler(PG_FUNCTION_ARGS) amroutine->ambuild = pinecone_build; amroutine->ambuildempty = pinecone_buildempty; amroutine->aminsert = pinecone_insert; - amroutine->ambulkdelete = no_bulkdelete; + amroutine->ambulkdelete = pinecone_bulkdelete; amroutine->amvacuumcleanup = no_vacuumcleanup; // used to indicate if we support index-only scans; takes a attno and returns a bool; // included cols should always return true since there is little point in an included column if it can't be returned diff --git a/src/pinecone.h b/src/pinecone.h index 115da916..f4bb8600 100644 --- a/src/pinecone.h +++ b/src/pinecone.h @@ -69,7 +69,7 @@ extern bool pinecone_insert(Relation index, Datum *values, bool *isnull, ItemPoi , bool indexUnchanged #endif , IndexInfo *indexInfo); -extern IndexBulkDeleteResult *no_bulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, IndexBulkDeleteCallback callback, void *callback_state); +extern IndexBulkDeleteResult *pinecone_bulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, IndexBulkDeleteCallback callback, void *callback_state); extern IndexBulkDeleteResult *no_vacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats); extern void no_costestimate(PlannerInfo *root, IndexPath *path, double loop_count, Cost *indexStartupCost, Cost *indexTotalCost, Selectivity *indexSelectivity, double *indexCorrelation, double *indexPages); extern bytea * pinecone_options(Datum reloptions, bool validate); @@ -78,6 +78,7 @@ extern IndexScanDesc pinecone_beginscan(Relation index, int nkeys, int norderbys extern void pinecone_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int norderbys); extern bool pinecone_gettuple(IndexScanDesc scan, ScanDirection dir); extern void no_endscan(IndexScanDesc scan); +ItemPointerData id_get_heap_tid(char *id); // void CreateMetaPage(Relation index, int dimensions, int lists, int forkNum) diff --git a/src/pinecone_api.c b/src/pinecone_api.c index 99d0ce4c..ebb9a963 100644 --- a/src/pinecone_api.c +++ b/src/pinecone_api.c @@ -6,20 +6,20 @@ #include "postgres.h" size_t write_callback(char *contents, size_t size, size_t nmemb, void *userdata) { - size_t real_size = size * nmemb; // size of the response - // char **str = (char **)userdata; // cast the userdata to a string pointer - ResponseData *response_data = (ResponseData *)userdata; - if (response_data->data == NULL) { - response_data->data = malloc(real_size + 1); - memcpy(response_data->data, contents, real_size); - response_data->length = real_size; - } else { - response_data->data = realloc(response_data->data, response_data->length + real_size); - memcpy(response_data->data + response_data->length, contents, real_size); + size_t real_size = size * nmemb; // Size of the response + ResponseData *response_data = (ResponseData *)userdata; // Cast the userdata to the specific structure + + // Attempt to resize the buffer + char *new_data = realloc(response_data->data, response_data->length + real_size + 1); + if (new_data != NULL) { + response_data->data = new_data; + memcpy(response_data->data + response_data->length, contents, real_size); // Append new data response_data->length += real_size; + response_data->data[response_data->length] = '\0'; // Null terminate the string } - response_data->data[response_data->length] = '\0'; // null terminate the string + elog(DEBUG1, "Response (write_callback): %s", contents); + return real_size; } @@ -68,6 +68,37 @@ cJSON* list_indexes(const char *api_key) { return indexes; } +cJSON* generic_pinecone_request(const char *api_key, const char *url, const char *method, cJSON *body) { + CURL *hnd = curl_easy_init(); + ResponseData response_data = {NULL, 0}; + cJSON *response_json; + set_curl_options(hnd, api_key, url, method, &response_data); + if (strcmp(method, "POST") == 0 || strcmp(method, "PUT") == 0) { + curl_easy_setopt(hnd, CURLOPT_POSTFIELDS, cJSON_Print(body)); + } + curl_easy_perform(hnd); + response_json = cJSON_Parse(response_data.data); + return response_json; +} + +cJSON* pinecone_list_vectors(const char *api_key, const char *index_host, int limit, char* pagination_token) { + char url[300]; + if (pagination_token != NULL) { + sprintf(url, "https://%s/vectors/list?limit=%d&paginationToken=%s", index_host, limit, pagination_token); + } else { + sprintf(url, "https://%s/vectors/list?limit=%d", index_host, limit); + } + return cJSON_GetObjectItem(generic_pinecone_request(api_key, url, "GET", NULL), "vectors"); +} + +cJSON* pinecone_delete_vectors(const char *api_key, const char *index_host, cJSON *ids) { + cJSON *request = cJSON_CreateObject(); + char url[300]; + sprintf(url, "https://%s/vectors/delete", index_host); + cJSON_AddItemToObject(request, "ids", ids); + return generic_pinecone_request(api_key, url, "POST", request); +} + /* name, dimension, metric * serverless: cloud, region * pod: environment, replicas, pod_type, pods, shards, metadata_config @@ -79,6 +110,7 @@ cJSON* create_index(const char *api_key, const char *index_name, const int dimen cJSON *spec_json = cJSON_Parse(server_spec); ResponseData response_data = {NULL, 0}; cJSON *response_json; + CURLcode ret; // add fields to body elog(DEBUG1, "Creating index %s with dimension %d and metric %s", index_name, dimension, metric); cJSON_AddItemToObject(body, "name", cJSON_CreateString(index_name)); @@ -88,10 +120,15 @@ cJSON* create_index(const char *api_key, const char *index_name, const int dimen // set curl options set_curl_options(hnd, api_key, "https://api.pinecone.io/indexes", "POST", &response_data); curl_easy_setopt(hnd, CURLOPT_POSTFIELDS, cJSON_Print(body)); - curl_easy_perform(hnd); + ret = curl_easy_perform(hnd); + if (ret != CURLE_OK) { + elog(ERROR, "curl_easy_perform() failed: %s\n", curl_easy_strerror(ret)); + } curl_easy_cleanup(hnd); // return response_data as json response_json = cJSON_Parse(response_data.data); + // print the response + elog(DEBUG1, "Response (create_index): %s", cJSON_Print(response_json)); return response_json; } @@ -117,10 +154,15 @@ void pinecone_bulk_upsert(const char *api_key, const char *index_host, cJSON *ve cJSON *batch; CURLM *multi_handle = curl_multi_init(); cJSON *batch_handle; + // todo we need to free the actual data in response_data + ResponseData* response_data = palloc(sizeof(ResponseData) * cJSON_GetArraySize(batches)); int running; + int i = 0; cJSON_ArrayForEach(batch, batches) { - batch_handle = get_pinecone_upsert_handle(api_key, index_host, cJSON_Duplicate(batch, true)); // TODO: figure out why i have to deepcopy + response_data[i] = (ResponseData) {NULL, 0}; + batch_handle = get_pinecone_upsert_handle(api_key, index_host, cJSON_Duplicate(batch, true), &response_data[i]); // TODO: figure out why i have to deepcopy // because batch goes out of scope curl_multi_add_handle(multi_handle, batch_handle); + i++; } // run the handles curl_multi_perform(multi_handle, &running); @@ -139,14 +181,15 @@ void pinecone_bulk_upsert(const char *api_key, const char *index_host, cJSON *ve } -CURL* get_pinecone_upsert_handle(const char *api_key, const char *index_host, cJSON *vectors) { +CURL* get_pinecone_upsert_handle(const char *api_key, const char *index_host, cJSON *vectors, ResponseData* response_data) { CURL *hnd = curl_easy_init(); cJSON *body = cJSON_CreateObject(); char *body_str; - ResponseData response_data = {NULL, 0}; + // this isn't safe because response_data will go out of scope before the writeback function is called + // furthermore, we need to be sure to free the memory allocated for response_data.data (by the writeback) char url[100] = "https://"; strcat(url, index_host); strcat(url, "/vectors/upsert"); // https://t1-23kshha.svc.apw5-4e34-81fa.pinecone.io/vectors/upsert cJSON_AddItemToObject(body, "vectors", vectors); - set_curl_options(hnd, api_key, url, "POST", &response_data); + set_curl_options(hnd, api_key, url, "POST", response_data); body_str = cJSON_Print(body); curl_easy_setopt(hnd, CURLOPT_POSTFIELDS, body_str); return hnd; diff --git a/src/pinecone_api.h b/src/pinecone_api.h index 771457da..2ef2096c 100644 --- a/src/pinecone_api.h +++ b/src/pinecone_api.h @@ -22,9 +22,12 @@ cJSON* pinecone_api_query_index(const char *api_key, const char *index_host, con // void pinecone_upsert(const char *api_key, const char *index_host, cJSON *vectors); // bulk insertion -CURL* get_pinecone_upsert_handle(const char *api_key, const char *index_host, cJSON *vectors); +CURL* get_pinecone_upsert_handle(const char *api_key, const char *index_host, cJSON *vectors, ResponseData *response_data); cJSON* batch_vectors(cJSON *vectors, int batch_size); void pinecone_bulk_upsert(const char *api_key, const char *index_host, cJSON *vectors, int batch_size); size_t write_callback(char *ptr, size_t size, size_t nmemb, void *userdata); +cJSON* pinecone_list_vectors(const char *api_key, const char *index_host, int limit, char* pagination_token); +cJSON* generic_pinecone_request(const char *api_key, const char *url, const char *method, cJSON *body); +cJSON* pinecone_delete_vectors(const char *api_key, const char *index_host, cJSON *ids); #endif // PINECONE_API_H \ No newline at end of file