Skip to content

Commit

Permalink
cleanup an write todos
Browse files Browse the repository at this point in the history
  • Loading branch information
oscarlaird committed Feb 24, 2024
1 parent 4ff04cc commit bdfa01e
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 137 deletions.
146 changes: 45 additions & 101 deletions src/pinecone.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ typedef struct PineconeOptions

char *pinecone_api_key = NULL;
int *pinecone_top_k = NULL;
// todo: principled batch sizes. Do we ever want the buffer to be bigger than a multi-insert? Possibly if we want to let the buffer fill up when the remote index is down.
static relopt_kind pinecone_relopt_kind;

void
Expand All @@ -62,49 +63,31 @@ PineconeInit(void)
"",
NULL, // TODO you can pass a function pointer to a validation function here, instead of doing it manually in amoptions
AccessExclusiveLock);
add_int_reloption(pinecone_relopt_kind, "buffer_threshold",
add_int_reloption(pinecone_relopt_kind, "buffer_threshold", // todo: this shouldn't be a reloption because then it can't be changed without a rebuild
"Buffer Threshold value",
PINECONE_DEFAULT_BUFFER_THRESHOLD,
PINECONE_MIN_BUFFER_THRESHOLD,
PINECONE_MAX_BUFFER_THRESHOLD,
AccessExclusiveLock);
DefineCustomStringVariable("pinecone.api_key",
"Pinecone API key",
"Pinecone API key",
&pinecone_api_key,
NULL,
DefineCustomStringVariable("pinecone.api_key", "Pinecone API key", "Pinecone API key",
&pinecone_api_key, NULL, // todo: this should default to an empty string because it is easier to deal with Not Authorized errors than it is to deal with segfaults; get rid of validate_api_key
PGC_SUSET, // restrict to superusers, takes immediate effect and is not saved in the configuration file
0,
NULL,
NULL,
NULL);
DefineCustomIntVariable("pinecone.top_k",
"topK parameter for pinecone queries",
"topK parameter for pinecone queries. Default is 10000. Lowering this value may improve performance but you will not be able to retrieve more than this number of results",
pinecone_top_k,
10000,
1,
10000,
0, NULL, NULL, NULL);
DefineCustomIntVariable("pinecone.top_k", "topK parameter for pinecone queries", "topK parameter for pinecone queries. Default is 10000. Lowering this value may improve performance but you will not be able to retrieve more than this number of results",
&pinecone_top_k,
10000, 1, 10000,
PGC_USERSET,
0,
NULL,
NULL,
NULL);
0, NULL, NULL, NULL);
MarkGUCPrefixReserved("pinecone");
}

VectorMetric
get_opclass_metric(Relation index)
{
FmgrInfo *procinfo;
Oid collation;
Datum datum;
VectorMetric metric;
procinfo = index_getprocinfo(index, 1, 2); // lookup the second support function in the opclass for the first attribute
collation = index->rd_indcollation[0]; // get the collation of the first attribute
datum = FunctionCall0Coll(procinfo, collation); // call the support function
metric = (VectorMetric) DatumGetInt32(datum);
return metric;
FmgrInfo *procinfo = index_getprocinfo(index, 1, 2); // lookup the second support function in the opclass for the first attribute
Oid collation = index->rd_indcollation[0]; // get the collation of the first attribute
Datum datum = FunctionCall0Coll(procinfo, collation); // call the support function
return (VectorMetric) DatumGetInt32(datum);
}


Expand All @@ -122,7 +105,6 @@ pinecone_build_callback(Relation index, ItemPointer tid, Datum *values, bool *is
TupleDesc itup_desc = index->rd_att;
cJSON *json_vector;
char vector_id[6 + 1]; // derive the vector_id from the heap_tid
// they have already deformed the heap tuple and extracted the values I want for me.
snprintf(vector_id, sizeof(vector_id), "%02x%02x%02x", tid->ip_blkid.bi_hi, tid->ip_blkid.bi_lo, tid->ip_posid);
elog(DEBUG1, "vector_id: %s", vector_id);
json_vector = tuple_get_pinecone_vector(itup_desc, values, isnull, vector_id);
Expand All @@ -149,7 +131,6 @@ IndexBuildResult *pinecone_build(Relation heap, Relation index, IndexInfo *index
int buffer_threshold;
int reltuples;
char *pinecone_index_name = (char *) palloc(100);
cJSON *describe_index_response;
PineconeOptions *opts = (PineconeOptions *) index->rd_options;
IndexBuildResult *result = palloc(sizeof(IndexBuildResult));
PineconeBuildState buildstate;
Expand All @@ -163,14 +144,14 @@ IndexBuildResult *pinecone_build(Relation heap, Relation index, IndexInfo *index
buffer_threshold = opts->buffer_threshold;
validate_api_key();
create_response = create_index(pinecone_api_key, pinecone_index_name, dimensions, pinecone_metric_name, spec);
// log the response host
host = cJSON_GetStringValue(cJSON_GetObjectItemCaseSensitive(create_response, "host"));
CreateMetaPage(index, dimensions, host, pinecone_index_name, buffer_threshold, metric, MAIN_FORKNUM);
// create buffer
CreateBufferHead(index, MAIN_FORKNUM);
// now we need to wait until the pinecone index is done initializing
while (true)
{
cJSON *describe_index_response;
elog(DEBUG1, "Waiting for remote index to initialize...");
sleep(1);
describe_index_response = describe_index(pinecone_api_key, pinecone_index_name);
Expand All @@ -185,7 +166,6 @@ IndexBuildResult *pinecone_build(Relation heap, Relation index, IndexInfo *index
buildstate.json_vectors = cJSON_CreateArray();

reltuples = table_index_build_scan(heap, index, indexInfo, true, true, pinecone_build_callback, (void *) &buildstate, NULL);

elog(DEBUG1, "BASE TABLE VECTORS: %s", cJSON_Print(buildstate.json_vectors));
pinecone_bulk_upsert(pinecone_api_key, host, buildstate.json_vectors, PINECONE_DEFAULT_BATCH_SIZE);
// stats
Expand Down Expand Up @@ -223,6 +203,7 @@ void CreateMetaPage(Relation index, int dimensions, char *host, char *pinecone_i
UnlockReleaseBuffer(buf);
}

// todo: this is too much boilerplate; I should just be able to get the page and release it
void incrMetaPageBufferFullness(Relation index)
{
Buffer buf;
Expand All @@ -239,6 +220,7 @@ void incrMetaPageBufferFullness(Relation index)
UnlockReleaseBuffer(buf);
}

// todo: see above
void setMetaPageBufferFullnessZero(Relation index)
{
Buffer buf;
Expand Down Expand Up @@ -287,9 +269,7 @@ PineconeMetaPageData ReadMetaPage(Relation index) {
return *metap;
}

void pinecone_buildempty(Relation index)
{
}
void pinecone_buildempty(Relation index) {}

/*
* Insert a tuple into the index
Expand All @@ -305,12 +285,9 @@ bool pinecone_insert(Relation index, Datum *values, bool *isnull, ItemPointer he
{
PineconeMetaPageData pinecone_meta;
cJSON *json_vectors;
// pinecone_upsert_one(pinecone_api_key, pinecone_meta.host, json_vector);
// insert into the buffer refer to ivfflatinsert and the InsertTuple function in ivfinsert.c
InsertBufferTupleMemCtx(index, values, isnull, heap_tid, heap, checkUnique, indexInfo);
incrMetaPageBufferFullness(index);
pinecone_meta = ReadMetaPage(index);
elog(DEBUG1, "Buffer fullness: %d", pinecone_meta.buffer_fullness);
// if the buffer is full, flush it to the remote index
if (pinecone_meta.buffer_fullness == pinecone_meta.buffer_threshold) {
elog(DEBUG1, "Buffer fullness = %d, flushing to remote index", pinecone_meta.buffer_fullness);
Expand Down Expand Up @@ -345,19 +322,12 @@ void clear_buffer(Relation index)
elog(DEBUG1, "deleted %d items", nitems);
// get the next page
currentblkno = PineconePageGetOpaque(page)->nextblkno;
if (BlockNumberIsValid(currentblkno))
{
// release the current buffer
UnlockReleaseBuffer(buf);
// get the next buffer
buf = ReadBuffer(index, currentblkno);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
page = BufferGetPage(buf);
}
else
{
break;
}
if (!BlockNumberIsValid(currentblkno)) break;
UnlockReleaseBuffer(buf);
// get the next buffer
buf = ReadBuffer(index, currentblkno);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
page = BufferGetPage(buf);
}
UnlockReleaseBuffer(buf);
}
Expand All @@ -383,19 +353,13 @@ cJSON* get_buffer_pinecone_vectors(Relation index)
}
// get the next page
currentblkno = PineconePageGetOpaque(page)->nextblkno;
if (BlockNumberIsValid(currentblkno))
{
// release the current buffer
UnlockReleaseBuffer(buf);
// get the next buffer
buf = ReadBuffer(index, currentblkno);
LockBuffer(buf, BUFFER_LOCK_SHARE);
page = BufferGetPage(buf);
}
else
{
break;
}
if (!BlockNumberIsValid(currentblkno)) break;
// release the current buffer
UnlockReleaseBuffer(buf);
// get the next buffer
buf = ReadBuffer(index, currentblkno);
LockBuffer(buf, BUFFER_LOCK_SHARE);
page = BufferGetPage(buf);
}
UnlockReleaseBuffer(buf);
return json_vectors;
Expand All @@ -408,13 +372,10 @@ cJSON* tuple_get_pinecone_vector(TupleDesc tup_desc, Datum *values, bool *isnull
Vector *vector;
cJSON *json_values;
vector = DatumGetVector(values[0]);
if (vector_eq_zero_internal(vector))
if (vector_eq_zero_internal(vector)) // todo: factor this out
{
// tell the user that the vector is an invalid datavalue because it contains a zero vector
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("Invalid vector: zero vector"),
errhint("Pinecone dense vectors cannot be zero in all dimensions")));
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("Invalid vector: zero vector"), errhint("Pinecone dense vectors cannot be zero in all dimensions")));
}
json_values = cJSON_CreateFloatArray(vector->x, vector->dim);
// prepare metadata
Expand Down Expand Up @@ -447,7 +408,7 @@ cJSON* index_tuple_get_pinecone_vector(Relation index, IndexTuple itup) {
Datum *itup_values = (Datum *) palloc(sizeof(Datum) * natts);
bool *itup_isnull = (bool *) palloc(sizeof(bool) * natts);
TupleDesc itup_desc = index->rd_att;
char vector_id[6 + 1]; // derive the vector_id from the heap_tid
char vector_id[6 + 1]; // derive the vector_id from the heap_tid // todo: this code is duplicated
index_deform_tuple(itup, itup_desc, itup_values, itup_isnull);
snprintf(vector_id, sizeof(vector_id), "%02x%02x%02x", itup->t_tid.ip_blkid.bi_hi, itup->t_tid.ip_blkid.bi_lo, itup->t_tid.ip_posid);
return tuple_get_pinecone_vector(itup_desc, itup_values, itup_isnull, vector_id);
Expand Down Expand Up @@ -476,23 +437,14 @@ void InsertBufferTuple(Relation index, Datum *values, bool *isnull, ItemPointer
Page page;
GenericXLogState *state;
bool success;
// detoast the values
// for (int i = 0; i < index->rd_att->natts; i++)
// {
// if (isnull[i]) continue;
// if (TupleDescAttr(index->rd_att, i)->attlen == -1)
// {
// values[i] = PointerGetDatum(PG_DETOAST_DATUM(values[i]));
// }
// }
// form tuple
itup = index_form_tuple(RelationGetDescr(index), values, isnull);
itup->t_tid = *heap_tid;
// find insert page
insertPage = PINECONE_BUFFER_HEAD_BLKNO;
// get the size of the tuple
itemsz = MAXALIGN(IndexTupleSize(itup));
// look for the first page in the chain which has enough space to fit the tuple
// look for the first page in the chain which has enough space to fit the tuple // todo: there is a better way to do this
while (true)
{
buf = ReadBuffer(index, insertPage);
Expand Down Expand Up @@ -544,22 +496,17 @@ void InsertBufferTuple(Relation index, Datum *values, bool *isnull, ItemPointer


IndexBulkDeleteResult *no_bulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
IndexBulkDeleteCallback callback, void *callback_state)
{
return NULL;
}
IndexBulkDeleteCallback callback, void *callback_state) { return NULL; }

IndexBulkDeleteResult *no_vacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
{
return NULL;
}
IndexBulkDeleteResult *no_vacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats) { return NULL; }

void
no_costestimate(PlannerInfo *root, IndexPath *path, double loop_count,
Cost *indexStartupCost, Cost *indexTotalCost,
Selectivity *indexSelectivity, double *indexCorrelation,
double *indexPages)
{
// todo: consider running a health check on the remote index and return infinity if it is not healthy
if (list_length(path->indexorderbycols) == 0 || linitial_int(path->indexorderbycols) != 0) {
elog(DEBUG1, "Index must be ordered by the first column");
*indexTotalCost = 1000000;
Expand All @@ -568,7 +515,7 @@ no_costestimate(PlannerInfo *root, IndexPath *path, double loop_count,
};



// todo: use a validate callback in add_reloption
bytea * pinecone_options(Datum reloptions, bool validate)
{
PineconeOptions *opts;
Expand Down Expand Up @@ -597,11 +544,7 @@ bytea * pinecone_options(Datum reloptions, bool validate)
return (bytea *) opts;
}

bool
no_validate(Oid opclassoid)
{
return true;
}
bool no_validate(Oid opclassoid) { return true; }

/*
* Prepare for an index scan
Expand Down Expand Up @@ -629,21 +572,20 @@ pinecone_beginscan(Relation index, int nkeys, int norderbys)

// prep sort
// TODO allocate 10MB for the sort (we should actually need a lot less)
so->sortstate = tuplesort_begin_heap(so->tupdesc, 1, attNums, sortOperators, sortCollations, nullsFirstFlags, *pinecone_top_k, NULL, false);
so->sortstate = tuplesort_begin_heap(so->tupdesc, 1, attNums, sortOperators, sortCollations, nullsFirstFlags, 10000, NULL, false);
so->slot = MakeSingleTupleTableSlot(so->tupdesc, &TTSOpsMinimalTuple);
//
scan->opaque = so;
// log scan->opaque
return scan;
}

/*
* Start or restart an index scan
* todo: can we reuse a connection created in pinecone_beginscan?
*/
void
pinecone_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int norderbys)
{
// {"$and": [{"flag": {"$eq": true}}, {"price": {"$lt": 10}}]} // example filter
Vector * vec;
cJSON *query_vector_values;
cJSON *pinecone_response;
Expand All @@ -657,21 +599,21 @@ pinecone_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, i
// double tuples = 0;
// filter
const char* pinecone_filter_operators[] = {"$lt", "$lte", "$eq", "$gte", "$gt", "$ne"};
cJSON *filter;
cJSON *filter; // {"$and": [{"flag": {"$eq": true}}, {"price": {"$lt": 10}}]} // example filter
cJSON *and_list;
// log the metadata
elog(DEBUG1, "nkeys: %d", nkeys);
pinecone_metadata = ReadMetaPage(scan->indexRelation);
so->dimensions = pinecone_metadata.dimensions;
so->metric = pinecone_metadata.metric;


if (scan->numberOfOrderBys == 0 || orderbys[0].sk_attno != 1) {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Index must be ordered by the first column")));
}

// todo: factor out building the filter
// build the filter
filter = cJSON_CreateObject();
and_list = cJSON_CreateArray();
Expand Down Expand Up @@ -729,6 +671,8 @@ pinecone_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, i
if (!IsMVCCSnapshot(scan->xs_snapshot))
elog(ERROR, "non-MVCC snapshots are not supported with pinecone");

// todo: there may be postgres helpers for iterating thru the buffer and using a callback to add to the sortstate
// todo: factor this out
// ADD BUFFER TO THE SORT AND PERFORM THE SORT
// TODO skip normlizaton for now
// TODO create the sortstate
Expand Down
Loading

0 comments on commit bdfa01e

Please sign in to comment.