Skip to content

Commit

Permalink
Added udp support for text and pickle server #4
Browse files Browse the repository at this point in the history
  • Loading branch information
brianhks committed Jan 30, 2019
1 parent e509a4b commit a7a94ff
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 21 deletions.
54 changes: 53 additions & 1 deletion build.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import tablesaw.addons.java.JarRule
import tablesaw.addons.java.JavaCRule
import tablesaw.addons.java.JavaProgram
import tablesaw.addons.junit.JUnitRule
import tablesaw.definitions.Definition
import tablesaw.rules.DirectoryRule
import tablesaw.rules.Rule
import tablesaw.rules.SimpleRule
import tablesaw.rules.CopyRule

import javax.swing.*

Expand Down Expand Up @@ -188,7 +190,7 @@ def doIvyResolve(Rule rule)

libFileSets = [
new RegExFileSet("build/jar", ".*\\.jar"),
new RegExFileSet("lib", ".*\\.jar"),
//new RegExFileSet("lib", ".*\\.jar"),
ivyFileSet
]

Expand All @@ -199,6 +201,7 @@ zipLibDir = "kairosdb/lib"
zipConfDir = "kairosdb/conf"
tarRule = new TarRule("build/${programName}-${version}.tar")
.addDepend(jp.getJarRule())
.addDepend(resolveIvyFileSetRule)
.addFileTo(zipConfDir, "src/main/resources", "kairos-carbon.properties")

for (AbstractFileSet fs in libFileSets)
Expand Down Expand Up @@ -301,5 +304,54 @@ def doDeb(Rule rule)
}
}

installDir = saw.getProperty("kairos_home")

deployConfig = new CopyRule()
.addFile("src/main/resources/kairos-carbon.properties")
.setDestination(installDir + "/conf")

deployRule = new CopyRule("deploy").setDescription("Deploy to karios install")
.addDepend(resolveIvyFileSetRule)
.addDepend(jp.getJarRule())
.addDepend(deployConfig)
.setDestination(installDir + "/lib")

for (AbstractFileSet fs in libFileSets)
deployRule.addFileSet(fs)


saw.setDefaultTarget("jar")


//------------------------------------------------------------------------------
//Build notification
def printMessage(String title, String message) {
osName = saw.getProperty("os.name")

Definition notifyDef
if (osName.startsWith("Linux"))
{
notifyDef = saw.getDefinition("linux-notify")
}
else if (osName.startsWith("Mac"))
{
notifyDef = saw.getDefinition("mac-notify")
}

if (notifyDef != null)
{
notifyDef.set("title", title)
notifyDef.set("message", message)
saw.exec(notifyDef.getCommand())
}
}

def buildFailure(Exception e)
{
printMessage("Build Failure", e.getMessage())
}

def buildSuccess(String target)
{
printMessage("Build Success", target)
}
9 changes: 0 additions & 9 deletions graphite_docker.sh

This file was deleted.

5 changes: 4 additions & 1 deletion ivy.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
<dependency org="org.kairosdb" name="kairosdb" rev="1.2.1-1" conf="provided->default" />
<!--<dependency org="com.sun.jersey" name="jersey-servlet" rev="1.15" transitive="false"/>-->

<!--<dependency org="net.razorvine" name="pyrolite" rev="4.12" />-->
<dependency org="net.razorvine" name="pyrolite" rev="4.22">
<artifact name="pyrolite" type="jar"/>
<exclude org="net.razorvine" name="serpent"/>
</dependency>

<dependency org="junit" name="junit" rev="4.11" conf="test->default" />
<dependency org="org.mockito" name="mockito-core" rev="1.9.5" conf="test->default"/>
Expand Down
Binary file removed lib/pyrolite.jar
Binary file not shown.
18 changes: 15 additions & 3 deletions src/main/java/org/kairosdb/plugin/carbon/CarbonPickleServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.inject.Inject;
import com.google.inject.name.Named;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
Expand Down Expand Up @@ -58,6 +59,7 @@ public class CarbonPickleServer extends SimpleChannelUpstreamHandler implements
private final Publisher<DataPointEvent> m_publisher;
private final TagParser m_tagParser;
private ServerBootstrap m_serverBootstrap;
private ConnectionlessBootstrap m_udpBootstrap;

public CarbonPickleServer(FilterEventBus eventBus, TagParser tagParser)
{
Expand Down Expand Up @@ -101,13 +103,10 @@ public ChannelPipeline getPipeline() throws Exception
public void messageReceived(final ChannelHandlerContext ctx,
final MessageEvent msgevent)
{
logger.info("Message received");
if (msgevent.getMessage() instanceof List)
{
logger.info("yup list");
for (Object o : (List) msgevent.getMessage())
{
logger.info("verify pickle");
//todo verify cast
PickleMetric metric = (PickleMetric)o;

Expand Down Expand Up @@ -160,13 +159,26 @@ public void start() throws KairosDBException
// Bind and start to accept incoming connections.
m_serverBootstrap.bind(new InetSocketAddress(m_address, m_port));


m_udpBootstrap = new ConnectionlessBootstrap(
new NioDatagramChannelFactory());

m_udpBootstrap.setOption("receiveBufferSizePredictorFactory", new FixedReceiveBufferSizePredictorFactory(m_maxSize));

m_udpBootstrap.setPipelineFactory(this);

m_udpBootstrap.bind(new InetSocketAddress(m_port));

}

@Override
public void stop()
{
if (m_serverBootstrap != null)
m_serverBootstrap.shutdown();

if (m_udpBootstrap != null)
m_udpBootstrap.shutdown();
}

@Override
Expand Down
32 changes: 30 additions & 2 deletions src/main/java/org/kairosdb/plugin/carbon/CarbonTextServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@

import com.google.inject.Inject;
import com.google.inject.name.Named;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.Delimiters;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
import org.jboss.netty.handler.codec.frame.LineBasedFrameDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
import org.kairosdb.core.DataPoint;
import org.kairosdb.core.KairosDBService;
Expand Down Expand Up @@ -48,11 +52,16 @@ public class CarbonTextServer extends SimpleChannelUpstreamHandler implements Ch
private final TagParser m_tagParser;
private ServerBootstrap m_serverBootstrap;

@Inject
@Named("kairosdb.carbon.text.max_size")
private int m_maxSize = 2048;

@Inject
private LongDataPointFactory m_longDataPointFactory = new LongDataPointFactoryImpl();

@Inject
private DoubleDataPointFactory m_doubleDataPointFactory = new DoubleDataPointFactoryImpl();
private ConnectionlessBootstrap m_udpBootstrap;

public CarbonTextServer(FilterEventBus eventBus,
TagParser tagParser, @Named("kairosdb.carbon.text.port") int port)
Expand Down Expand Up @@ -85,8 +94,9 @@ public ChannelPipeline getPipeline() throws Exception
ChannelPipeline pipeline = Channels.pipeline();

// Add the text line codec combination first,
DelimiterBasedFrameDecoder frameDecoder = new DelimiterBasedFrameDecoder(
1024, Delimiters.lineDelimiter());
FrameDecoder frameDecoder = new LineBasedFrameDecoder(
m_maxSize, true, true);

pipeline.addLast("framer", frameDecoder);
pipeline.addLast("decoder", new WordSplitter());
pipeline.addLast("encoder", new StringEncoder());
Expand Down Expand Up @@ -122,6 +132,11 @@ public void messageReceived(final ChannelHandlerContext ctx,
return;
}

if ("NaN".equalsIgnoreCase(msgArr[2]))
{
logger.info("Metric {} has a timetamp of 'NaN'. Not sending to Kairos", msgArr[0]);
return;
}
long timestamp = Long.parseLong(msgArr[2]) * 1000; //Converting to milliseconds

DataPoint dp;
Expand Down Expand Up @@ -192,13 +207,26 @@ public void start() throws KairosDBException

// Bind and start to accept incoming connections.
m_serverBootstrap.bind(new InetSocketAddress(m_address, m_port));


m_udpBootstrap = new ConnectionlessBootstrap(
new NioDatagramChannelFactory());

m_udpBootstrap.setOption("receiveBufferSizePredictorFactory", new FixedReceiveBufferSizePredictorFactory(m_maxSize));

m_udpBootstrap.setPipelineFactory(this);

m_udpBootstrap.bind(new InetSocketAddress(m_port));
}

@Override
public void stop()
{
if (m_serverBootstrap != null)
m_serverBootstrap.shutdown();

if (m_udpBootstrap != null)
m_udpBootstrap.shutdown();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class Unpickler extends net.razorvine.pickle.Unpickler
private boolean is_tuple = false;

@Override
protected void dispatch(short key) throws IOException
protected Object dispatch(short key) throws IOException
{
if (key == Opcodes.TUPLE2)
{
Expand Down Expand Up @@ -51,7 +51,7 @@ else if ((key == Opcodes.TUPLE))
tupleOpcodeCounter++;
if(tupleOpcodeCounter%2 == 0){
tupleOpcodeCounter =0 ;
return ;
return NO_RETURN_VALUE;
}

//Pop three items from stack
Expand Down Expand Up @@ -87,7 +87,8 @@ else if(key == Opcodes.APPEND && is_tuple){

}
else
super.dispatch(key);
return super.dispatch(key);

return NO_RETURN_VALUE;
}
}
}
5 changes: 4 additions & 1 deletion src/main/resources/kairos-carbon.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ kairosdb.carbon.pickle.address=0.0.0.0
kairosdb.carbon.pickle.port=2004

# Determines the size of the buffer to allocate for incoming pickles
kairosdb.carbon.pickle.max_size=2048
kairosdb.carbon.pickle.max_size=4096

# Determines the size of the buffer to allocate for incoming text metrics
kairosdb.carbon.text.max_size=4096

# HostTagParser properties
kairosdb.carbon.hosttagparser.host_pattern=[^.]*\.([^.]*)\..*
Expand Down

0 comments on commit a7a94ff

Please sign in to comment.