Skip to content

Commit

Permalink
vacuuming
Browse files Browse the repository at this point in the history
  • Loading branch information
oscarlaird committed Feb 24, 2024
1 parent b861c24 commit f3e5c11
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 22 deletions.
30 changes: 26 additions & 4 deletions src/pinecone.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ PineconeInit(void)
"",
NULL, // TODO you can pass a function pointer to a validation function here, instead of doing it manually in amoptions
AccessExclusiveLock);
add_int_reloption(pinecone_relopt_kind, "buffer_threshold", // todo: this shouldn't be a reloption because then it can't be changed without a rebuild
add_int_reloption(pinecone_relopt_kind, "buffer_threshold", // todo: this shouldn't be a reloption because then it can't be changed without a rebuild of the index
"Buffer Threshold value",
PINECONE_DEFAULT_BUFFER_THRESHOLD,
PINECONE_MIN_BUFFER_THRESHOLD,
Expand Down Expand Up @@ -493,10 +493,32 @@ void InsertBufferTuple(Relation index, Datum *values, bool *isnull, ItemPointer
UnlockReleaseBuffer(buf);
}

ItemPointerData id_get_heap_tid(char *id)
{
ItemPointerData heap_tid;
sscanf(id, "%02hx%02hx%02hx", &heap_tid.ip_blkid.bi_hi, &heap_tid.ip_blkid.bi_lo, &heap_tid.ip_posid);
return heap_tid;
}


IndexBulkDeleteResult *no_bulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
IndexBulkDeleteCallback callback, void *callback_state) { return NULL; }
IndexBulkDeleteResult *pinecone_bulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
IndexBulkDeleteCallback callback, void *callback_state)
{
char* host = ReadMetaPage(info->index).host;
cJSON* ids_to_delete = cJSON_CreateArray();
// yikes! pinecone makes you read your ids sequentially in pages of 100, so vacuuming a large index is going to be impossible
cJSON* json_vectors = pinecone_list_vectors(pinecone_api_key, ReadMetaPage(info->index).host, 100, NULL);
cJSON* json_vector; // todo: do I need to be freeing all my cJSON objects?
elog(DEBUG1, "vacuuming json_vectors: %s", cJSON_Print(json_vectors));
cJSON_ArrayForEach(json_vector, json_vectors) {
char* id = cJSON_GetStringValue(cJSON_GetObjectItem(json_vector, "id"));
ItemPointerData heap_tid = id_get_heap_tid(id);
if (callback(&heap_tid, callback_state)) cJSON_AddItemToArray(ids_to_delete, cJSON_CreateString(id));
}
elog(DEBUG1, "deleting ids: %s", cJSON_Print(ids_to_delete));
pinecone_delete_vectors(pinecone_api_key, host, ids_to_delete);
return NULL;
}

IndexBulkDeleteResult *no_vacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats) { return NULL; }

Expand Down Expand Up @@ -830,7 +852,7 @@ Datum pineconehandler(PG_FUNCTION_ARGS)
amroutine->ambuild = pinecone_build;
amroutine->ambuildempty = pinecone_buildempty;
amroutine->aminsert = pinecone_insert;
amroutine->ambulkdelete = no_bulkdelete;
amroutine->ambulkdelete = pinecone_bulkdelete;
amroutine->amvacuumcleanup = no_vacuumcleanup;
// used to indicate if we support index-only scans; takes a attno and returns a bool;
// included cols should always return true since there is little point in an included column if it can't be returned
Expand Down
3 changes: 2 additions & 1 deletion src/pinecone.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ extern bool pinecone_insert(Relation index, Datum *values, bool *isnull, ItemPoi
, bool indexUnchanged
#endif
, IndexInfo *indexInfo);
extern IndexBulkDeleteResult *no_bulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, IndexBulkDeleteCallback callback, void *callback_state);
extern IndexBulkDeleteResult *pinecone_bulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, IndexBulkDeleteCallback callback, void *callback_state);
extern IndexBulkDeleteResult *no_vacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats);
extern void no_costestimate(PlannerInfo *root, IndexPath *path, double loop_count, Cost *indexStartupCost, Cost *indexTotalCost, Selectivity *indexSelectivity, double *indexCorrelation, double *indexPages);
extern bytea * pinecone_options(Datum reloptions, bool validate);
Expand All @@ -78,6 +78,7 @@ extern IndexScanDesc pinecone_beginscan(Relation index, int nkeys, int norderbys
extern void pinecone_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int norderbys);
extern bool pinecone_gettuple(IndexScanDesc scan, ScanDirection dir);
extern void no_endscan(IndexScanDesc scan);
ItemPointerData id_get_heap_tid(char *id);


// void CreateMetaPage(Relation index, int dimensions, int lists, int forkNum)
Expand Down
75 changes: 59 additions & 16 deletions src/pinecone_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@
#include "postgres.h"

size_t write_callback(char *contents, size_t size, size_t nmemb, void *userdata) {
size_t real_size = size * nmemb; // size of the response
// char **str = (char **)userdata; // cast the userdata to a string pointer
ResponseData *response_data = (ResponseData *)userdata;
if (response_data->data == NULL) {
response_data->data = malloc(real_size + 1);
memcpy(response_data->data, contents, real_size);
response_data->length = real_size;
} else {
response_data->data = realloc(response_data->data, response_data->length + real_size);
memcpy(response_data->data + response_data->length, contents, real_size);
size_t real_size = size * nmemb; // Size of the response
ResponseData *response_data = (ResponseData *)userdata; // Cast the userdata to the specific structure

// Attempt to resize the buffer
char *new_data = realloc(response_data->data, response_data->length + real_size + 1);
if (new_data != NULL) {
response_data->data = new_data;
memcpy(response_data->data + response_data->length, contents, real_size); // Append new data
response_data->length += real_size;
response_data->data[response_data->length] = '\0'; // Null terminate the string
}
response_data->data[response_data->length] = '\0'; // null terminate the string

elog(DEBUG1, "Response (write_callback): %s", contents);

return real_size;
}

Expand Down Expand Up @@ -68,6 +68,37 @@ cJSON* list_indexes(const char *api_key) {
return indexes;
}

cJSON* generic_pinecone_request(const char *api_key, const char *url, const char *method, cJSON *body) {
CURL *hnd = curl_easy_init();
ResponseData response_data = {NULL, 0};
cJSON *response_json;
set_curl_options(hnd, api_key, url, method, &response_data);
if (strcmp(method, "POST") == 0 || strcmp(method, "PUT") == 0) {
curl_easy_setopt(hnd, CURLOPT_POSTFIELDS, cJSON_Print(body));
}
curl_easy_perform(hnd);
response_json = cJSON_Parse(response_data.data);
return response_json;
}

cJSON* pinecone_list_vectors(const char *api_key, const char *index_host, int limit, char* pagination_token) {
char url[300];
if (pagination_token != NULL) {
sprintf(url, "https://%s/vectors/list?limit=%d&paginationToken=%s", index_host, limit, pagination_token);
} else {
sprintf(url, "https://%s/vectors/list?limit=%d", index_host, limit);
}
return cJSON_GetObjectItem(generic_pinecone_request(api_key, url, "GET", NULL), "vectors");
}

cJSON* pinecone_delete_vectors(const char *api_key, const char *index_host, cJSON *ids) {
cJSON *request = cJSON_CreateObject();
char url[300];
sprintf(url, "https://%s/vectors/delete", index_host);
cJSON_AddItemToObject(request, "ids", ids);
return generic_pinecone_request(api_key, url, "POST", request);
}

/* name, dimension, metric
* serverless: cloud, region
* pod: environment, replicas, pod_type, pods, shards, metadata_config
Expand All @@ -79,6 +110,7 @@ cJSON* create_index(const char *api_key, const char *index_name, const int dimen
cJSON *spec_json = cJSON_Parse(server_spec);
ResponseData response_data = {NULL, 0};
cJSON *response_json;
CURLcode ret;
// add fields to body
elog(DEBUG1, "Creating index %s with dimension %d and metric %s", index_name, dimension, metric);
cJSON_AddItemToObject(body, "name", cJSON_CreateString(index_name));
Expand All @@ -88,10 +120,15 @@ cJSON* create_index(const char *api_key, const char *index_name, const int dimen
// set curl options
set_curl_options(hnd, api_key, "https://api.pinecone.io/indexes", "POST", &response_data);
curl_easy_setopt(hnd, CURLOPT_POSTFIELDS, cJSON_Print(body));
curl_easy_perform(hnd);
ret = curl_easy_perform(hnd);
if (ret != CURLE_OK) {
elog(ERROR, "curl_easy_perform() failed: %s\n", curl_easy_strerror(ret));
}
curl_easy_cleanup(hnd);
// return response_data as json
response_json = cJSON_Parse(response_data.data);
// print the response
elog(DEBUG1, "Response (create_index): %s", cJSON_Print(response_json));
return response_json;
}

Expand All @@ -117,10 +154,15 @@ void pinecone_bulk_upsert(const char *api_key, const char *index_host, cJSON *ve
cJSON *batch;
CURLM *multi_handle = curl_multi_init();
cJSON *batch_handle;
// todo we need to free the actual data in response_data
ResponseData* response_data = palloc(sizeof(ResponseData) * cJSON_GetArraySize(batches));
int running;
int i = 0;
cJSON_ArrayForEach(batch, batches) {
batch_handle = get_pinecone_upsert_handle(api_key, index_host, cJSON_Duplicate(batch, true)); // TODO: figure out why i have to deepcopy
response_data[i] = (ResponseData) {NULL, 0};
batch_handle = get_pinecone_upsert_handle(api_key, index_host, cJSON_Duplicate(batch, true), &response_data[i]); // TODO: figure out why i have to deepcopy // because batch goes out of scope
curl_multi_add_handle(multi_handle, batch_handle);
i++;
}
// run the handles
curl_multi_perform(multi_handle, &running);
Expand All @@ -139,14 +181,15 @@ void pinecone_bulk_upsert(const char *api_key, const char *index_host, cJSON *ve
}


CURL* get_pinecone_upsert_handle(const char *api_key, const char *index_host, cJSON *vectors) {
CURL* get_pinecone_upsert_handle(const char *api_key, const char *index_host, cJSON *vectors, ResponseData* response_data) {
CURL *hnd = curl_easy_init();
cJSON *body = cJSON_CreateObject();
char *body_str;
ResponseData response_data = {NULL, 0};
// this isn't safe because response_data will go out of scope before the writeback function is called
// furthermore, we need to be sure to free the memory allocated for response_data.data (by the writeback)
char url[100] = "https://"; strcat(url, index_host); strcat(url, "/vectors/upsert"); // https://t1-23kshha.svc.apw5-4e34-81fa.pinecone.io/vectors/upsert
cJSON_AddItemToObject(body, "vectors", vectors);
set_curl_options(hnd, api_key, url, "POST", &response_data);
set_curl_options(hnd, api_key, url, "POST", response_data);
body_str = cJSON_Print(body);
curl_easy_setopt(hnd, CURLOPT_POSTFIELDS, body_str);
return hnd;
Expand Down
5 changes: 4 additions & 1 deletion src/pinecone_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ cJSON* pinecone_api_query_index(const char *api_key, const char *index_host, con
// void pinecone_upsert(const char *api_key, const char *index_host, cJSON *vectors);

// bulk insertion
CURL* get_pinecone_upsert_handle(const char *api_key, const char *index_host, cJSON *vectors);
CURL* get_pinecone_upsert_handle(const char *api_key, const char *index_host, cJSON *vectors, ResponseData *response_data);
cJSON* batch_vectors(cJSON *vectors, int batch_size);
void pinecone_bulk_upsert(const char *api_key, const char *index_host, cJSON *vectors, int batch_size);
size_t write_callback(char *ptr, size_t size, size_t nmemb, void *userdata);
cJSON* pinecone_list_vectors(const char *api_key, const char *index_host, int limit, char* pagination_token);
cJSON* generic_pinecone_request(const char *api_key, const char *url, const char *method, cJSON *body);
cJSON* pinecone_delete_vectors(const char *api_key, const char *index_host, cJSON *ids);

#endif // PINECONE_API_H

0 comments on commit f3e5c11

Please sign in to comment.