Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compatlib_function - ConnectBy supports VARCHAR inputs #70

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions compatlib_functions/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

SDK?=/opt/vertica/sdk
VSQL?=vsql
COMPATLIB_DEBUG = 0

VERTICA_SDK_INCLUDE = $(SDK)/include
SIMULATOR_PATH = $(SDK)/simulator
Expand All @@ -23,7 +24,7 @@ BUILD_FILES = build/Vertica.o \
PACKAGE_LIBNAME = lib/CompatLib.so

CXX=g++
CXXFLAGS=-g -D HAVE_LONG_LONG_INT_64 -c -I ../include -Wall -Wno-unused-value -fPIC -I $(VERTICA_SDK_INCLUDE) -I $(THIRD_PARTY_INCLUDE)
CXXFLAGS=-g -DHAVE_LONG_LONG_INT_64 -D_GLIBCXX_USE_CXX11_ABI=0 -DCOMPATLIB_DEBUG=$(COMPATLIB_DEBUG) -c -I ../include -Wall -Wno-unused-value -fPIC -std=c++11 -I $(VERTICA_SDK_INCLUDE) -I $(THIRD_PARTY_INCLUDE)
LDFLAGS=-shared

# add optimization if not a debug build
Expand All @@ -37,9 +38,9 @@ endif
all: $(PACKAGE_LIBNAME)

# Main target that builds the package library
$(PACKAGE_LIBNAME): $(BUILD_FILES)
$(PACKAGE_LIBNAME): $(BUILD_FILES)
mkdir -p lib
$(CXX) $(LDFLAGS) -o $@ $(BUILD_FILES)
$(CXX) $(LDFLAGS) -o $@ $(BUILD_FILES)

# rule to make build/XXX.so from src/XXX.so
build/%.o: src/%.cpp
Expand All @@ -54,7 +55,7 @@ build/Vertica.o: $(VERTICA_SDK_INCLUDE)/Vertica.cpp
# example rule to make build/XX.o from third-party/src/*.c
#build/%.o: $(THIRD_PARTY)/src/%.c
# @mkdir -p build
# $(CXX) $(CXXFLAGS) $< -o $@
# $(CXX) $(CXXFLAGS) $< -o $@


# Targets to install and uninstall the library and functions
Expand Down Expand Up @@ -88,5 +89,4 @@ sim_test: all simulator
# build the simulator (in SIMULATOR_PATH) and simlink it here
simulator:
$(MAKE) -C $(SIMULATOR_PATH)
ln -f -s $(SIMULATOR_PATH)/vsim

ln -f -s $(SIMULATOR_PATH)/vsim
4 changes: 2 additions & 2 deletions compatlib_functions/ddl/install.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ select version();
\set libfile '\''`pwd`'/lib/CompatLib.so\'';
CREATE LIBRARY CompatLib AS :libfile;

CREATE TRANSFORM FUNCTION connect_by_path AS LANGUAGE 'C++' NAME 'ConnectByFactory' LIBRARY CompatLib;
CREATE TRANSFORM FUNCTION connect_by_path AS LANGUAGE 'C++' NAME 'ConnectByIntFactory' LIBRARY CompatLib;
CREATE TRANSFORM FUNCTION connect_by_path AS LANGUAGE 'C++' NAME 'ConnectByVarcharFactory' LIBRARY CompatLib;
CREATE TRANSFORM FUNCTION transpose AS LANGUAGE 'C++' NAME 'TransposeFactory' LIBRARY CompatLib;

-- Add more as needed
CREATE TRANSFORM FUNCTION group_generator_3 AS LANGUAGE 'C++' NAME 'GroupGeneratorFactoryVVV' LIBRARY CompatLib;
CREATE TRANSFORM FUNCTION group_generator_3 AS LANGUAGE 'C++' NAME 'GroupGeneratorFactoryFVF' LIBRARY CompatLib;
CREATE TRANSFORM FUNCTION group_generator_4 AS LANGUAGE 'C++' NAME 'GroupGeneratorFactoryVVVV' LIBRARY CompatLib;
CREATE TRANSFORM FUNCTION group_generator_4 AS LANGUAGE 'C++' NAME 'GroupGeneratorFactoryFVFV' LIBRARY CompatLib;

2 changes: 2 additions & 0 deletions compatlib_functions/examples/connect_by_path.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ commit;

select connect_by_path(supervisor_id, id, name, ' >> ') over () from company;

select connect_by_path(supervisor_id::VARCHAR, id::VARCHAR, name, ' >> ') over () from company;

drop table company;
192 changes: 153 additions & 39 deletions compatlib_functions/src/ConnectBy.cpp
Original file line number Diff line number Diff line change
@@ -1,55 +1,132 @@
#include "Vertica.h"
#include <sstream>
#include <string>
#include <map>

using namespace Vertica;
using namespace std;


const int WIDTH = 2000;
const int WIDTH = 65000;

class ConnectBy : public TransformFunction
{
virtual void setup(ServerInterface &srvInterface, const SizedColumnTypes &argTypes)
{
// Get data type OID of parent ID and child ID
const VerticaType argType = argTypes.getColumnType(0);
argTypeOID = argType.getTypeOid();
}

virtual void processPartition(ServerInterface &srvInterface,
PartitionReader &input_reader,
PartitionWriter &output_writer)
{
map<vint, vint> parent;
map<vint, string> label;
map<vint, string> separator;


if (input_reader.getNumCols() != 4)
vt_report_error(0, "Function only accepts 4 argument, but %zu provided", input_reader.getNumCols());

do {
parent[input_reader.getIntRef(1)] = input_reader.getIntRef(0);
label[input_reader.getIntRef(1)] = input_reader.getStringRef(2).str();
separator[input_reader.getIntRef(1)] = input_reader.getStringRef(3).str();
switch(argTypeOID) {
case VarcharOID:
internalProcessPartition<string>(srvInterface, input_reader, output_writer);
break;
case Int8OID:
internalProcessPartition<vint>(srvInterface, input_reader, output_writer);
break;
default:
break;
}
}

private:

BaseDataOID argTypeOID; // data type OID of parent ID and child ID

bool isEndValue(vint value)
{
return value == vint_null;
}

bool isEndValue(string value)
{
return value == "";
}

void getValueFromReader(vint &output, PartitionReader &input_reader, size_t idx)
{
output = input_reader.getIntRef(idx);
}

std::string inString = input_reader.getStringRef(2).str();
void getValueFromReader(string &output, PartitionReader &input_reader, size_t idx)
{
output = input_reader.getStringRef(idx).str();
}

//srvInterface.log(" adding %s ", inString.c_str());
void setValueToWriter(vint value, PartitionWriter &output_writer, size_t idx)
{
output_writer.setInt(idx, value);
}

void setValueToWriter(string value, PartitionWriter &output_writer, size_t idx)
{
VString &word = output_writer.getStringRef(idx);
word.copy(value);
}

#if COMPATLIB_DEBUG
void log(ServerInterface &srvInterface, const char *format, string value)
{
srvInterface.log(format, value.c_str());
}

void log(ServerInterface &srvInterface, const char *format, vint value)
{
log(srvInterface, format, to_string(value));
}
#endif

template <typename T>
void internalProcessPartition(ServerInterface &srvInterface,
PartitionReader &input_reader,
PartitionWriter &output_writer)
{
map<T, T> parent;
map<T, string> label;
map<T, string> separator;

do {
T childValue;
// If child ID is null, skip it
if (input_reader.isNull(1)) {
continue;
} else {
getValueFromReader(childValue, input_reader, 1);
getValueFromReader(parent[childValue], input_reader, 0);
}

getValueFromReader(label[childValue], input_reader, 2);
getValueFromReader(separator[childValue], input_reader, 3);
} while (input_reader.next());
//srvInterface.log("1");
// exit(0);

map<vint, string> cache;
map<vint, vint> depth;
map<vint, string>::iterator p;
map<T, string> cache;
map<T, vint> depth;
typename map<T, string>::iterator p;

for (p = label.begin(); p != label.end(); ++p) {
if (cache.count(p->first) == 0) {

#if COMPATLIB_DEBUG
log(srvInterface, "working on [%s]", p->first);
#endif

if (cache.count(p->first) == 0) {
string output = p->second;
vint current_depth = 0;
vint current = parent[p->first];
while (current != 0 ) {
vint current_depth = 0ull;
T current = parent[p->first];

srvInterface.log("working on %lld", current);

#if COMPATLIB_DEBUG
log(srvInterface, " parent: [%s]", current);
#endif

while (!isEndValue(current)) {
if (cache.count(current) > 0 ) {
// Found the parent's path in the cache
output = cache[current] + separator[current] + output;
Expand All @@ -65,28 +142,28 @@ class ConnectBy : public TransformFunction

cache[p->first] = output;
depth[p->first] = current_depth;
}

srvInterface.log("attempting to output %lld", p->first);
#if COMPATLIB_DEBUG
log(srvInterface, " output: [%s]", output);
log(srvInterface, " depth : [%s]", current_depth);
#endif

}

output_writer.setInt(0,p->first);
output_writer.setInt(1,depth[p->first]);
VString &word = output_writer.getStringRef(2);
word.copy(cache[p->first]);
setValueToWriter(p->first, output_writer, 0);
setValueToWriter(depth[p->first], output_writer, 1);
setValueToWriter(cache[p->first], output_writer, 2);
output_writer.next();
}


}


};


class ConnectByFactory : public TransformFunctionFactory
// This factory accepts function calls with integer arguments for parent ID and child ID
class ConnectByIntFactory : public TransformFunctionFactory
{
// Tell Vertica that we take in a row with 5 inputs (parent ID, child ID, label, separator,
// and return a row with 2 strings (id, path)
// Tell Vertica that we take in a row with 4 inputs (parent ID, child ID, label, separator)
// and return a row with 3 values (id, depth, path)
virtual void getPrototype(ServerInterface &srvInterface, ColumnTypes &argTypes, ColumnTypes &returnType)
{
argTypes.addInt();
Expand All @@ -105,9 +182,9 @@ class ConnectByFactory : public TransformFunctionFactory
const SizedColumnTypes &input_types,
SizedColumnTypes &output_types)
{
// Error out if we're called with anything but 1 argument
// Error out if we're called with anything not 4 arguments
if (input_types.getColumnCount() != 4)
vt_report_error(0, "Function only accepts 3 arguments, but %zu provided", input_types.getColumnCount());
vt_report_error(0, "Function only accepts 4 arguments, but %zu provided", input_types.getColumnCount());


// Our output size will never be more than the input size
Expand All @@ -121,6 +198,43 @@ class ConnectByFactory : public TransformFunctionFactory

};

RegisterFactory(ConnectByFactory);
RegisterFactory(ConnectByIntFactory);


// This factory accepts function calls with varchar arguments for parent ID and child ID
class ConnectByVarcharFactory : public TransformFunctionFactory
{
// Tell Vertica that we take in a row with 4 inputs (parent ID, child ID, label, separator)
// and return a row with 3 values (id, depth, path)
virtual void getPrototype(ServerInterface &srvInterface, ColumnTypes &argTypes, ColumnTypes &returnType)
{
argTypes.addVarchar(); // parent ID
argTypes.addVarchar(); // child ID
argTypes.addVarchar(); // label
argTypes.addVarchar(); // separator

returnType.addVarchar(); // id
returnType.addInt(); // depth
returnType.addVarchar(); // separator
}

// Tell Vertica what our return string length will be, given the input string length
virtual void getReturnType(ServerInterface &srvInterface,
const SizedColumnTypes &input_types,
SizedColumnTypes &output_types)
{
// Error out if we're called with anything not 4 arguments
if (input_types.getColumnCount() != 4)
vt_report_error(0, "Function only accepts 4 arguments, but %zu provided", input_types.getColumnCount());

output_types.addVarchar(WIDTH, "identifier");
output_types.addInt("depth");
output_types.addVarchar(WIDTH, "path");
}

virtual TransformFunction *createTransformFunction(ServerInterface &srvInterface)
{ return vt_createFuncObj(srvInterface.allocator, ConnectBy); }

};

RegisterFactory(ConnectByVarcharFactory);