This is a fork of Twitter/HBC that supports streaming from a Gnip PowerTrack 2.0 stream.
We don't have any build process established around this repository. We use jitpack.io to build the package for use. You can take a look at the available package version here: https://jitpack.io/#u2i/hbc
For more details please review the sg-tweets-collection README
Twitter's Hosebird client (HBC) has been a popular streaming consumer app for many years. Originally built around the Twitter (public) Streaming API, it was extended a couple of years ago to work with Gnip PowerTrack streams. Since it was written to work with Twitter APIs, it natively supported OAuth authentication. The main trick with extending the library to work with Gnip streams was building in Basic Authentication. That update was made back in 2014 and Gnip customers have been happily using HBC with PowerTrack v1 ever since.
This week we sat down to update the HBC to work with PowerTrack V2. PowerTrack 2.0 is hosted on Twitter servers, so going in it was assumed that the only updates required were related to changes to the realtime PowerTrack endpoint URLs. However, after those straightforward updates it was discovered that the HBC's attempts to use Basic Authentication with the Twitter servers was failing with 401 errors.
So after some experimentation it was discovered that explicitly adding a Authentication header to the connection request was what was needed to authenticate and start streaming data.
Several HBC files were updated to make it compatiable with PowerTrack 2.0. The updated HBC library is available HERE. See below for a tour of the code changes made.
The following HBC files were updated to work with PTv2 and to add an authentication request header:
- com.twitter.hbc.core.HttpConstants
- com.twitter.hbc.core.endpoint.EnterpriseStreamingEndpoint_v2
- com.twitter.hbc.example.EnterpriseStream_v2
Note that these updates were implemented to enable this library to stream from both versions of PowerTrack. If you only want to use HBC with PowerTrack 2.0, all the 'v2' details can be dropped, and the updates can be folded into the existing non-versioned name space.
In this file, we just added the root host domain for PowerTrack 2.0, https://gnip-stream.twitter.com.
package com.twitter.hbc.core;
public class Constants {
public static final String ENTERPRISE_STREAM_HOST = "https://stream.gnip.com";
public static final String ENTERPRISE_STREAM_HOST_v2 = "https://gnip-stream.twitter.com";
}
Here, we cloned the EnterpriseStreamingEndpoint class and created a new EnterpriseSteamingEndpoint_v2 class.
The only significant change was in constructing the v2 BASE_PATH, using the same product, account name and stream label tokens, and arranging them in the v2 order:
private static final String BASE_PATH = "/stream/%s/accounts/%s/publishers/%s/%s.json"; //product, account_name, stream_label
The only other changes were updating the class constructors to reflect the new class name.
- public EnterpriseStreamingEndpoint_v2(String account, String product, String label)
- public EnterpriseStreamingEndpoint_v2(String account, String product, String label, int clientId)
- public EnterpriseStreamingEndpoint_v2(String account, String publisher, String product, String label, int clientId)
This class contains the key update that enables basic authentication with PowerTrack v2. After creating a HTTP request object, new code was added to explicitly add a basic authentication header to the request. Adding this header requires Base64 encoding of the authentication username and password.
The Base64 encoding is implemented with this new import:
import sun.misc.BASE64Encoder;
Here is the code block that adds the header to the request:
auth.signRequest(request, postContent);
//PTv2 update: Explicitly adding Authorization header with Base64 encoded username and password.
BASE64Encoder encoder = new BASE64Encoder();
String authToken = auth.getUsername() + ":" + auth.getPassword();
String authValue = "Basic " + encoder.encode(authToken.getBytes());
request.addHeader("Authorization", authValue);
Connection conn = new Connection(client, processor);
The Authentication call is an interface that gets implemented by both the BasicAuth
and OAuth1
classes. As a first attempt, getUsername and getPassword functions were added to the class interface. This is not ideal since both classes must implement the methods, and OAuth1 has no use of the username/password. For now the OAuth1 implements the 'get' methods by simply returing a null
, but a good next step would be to eliminate the (small) Authentication interface.
public interface Authentication {
String getUsername();
String getPassword();
}
This class implements the Authentication class. Since we added the getUsername()
and getPassword()
methods to that interface, we implement the methods here.
public String getUsername() {
return this.username;
}
public String getPassword() {
return this.password;
}
This class implements the Authentication class. Since we added the getUsername()
and getPassword()
methods to that interface, we implement the methods here. This is not ideal since teh OAuth1 class has no use for the username and password, and the implemented methods simply return null
. A good next step would be to eliminate the (small) Authentication interface.
public String getUsername() {
return null;
}
public String getPassword() {
return null;
}
To implement a HBC library that can stream from both versions of PowerTrack, this PTv2 example client is a clone of the PTv1 version (EnterpriseStreamExample.java). The updates here consist of:
- Creating an
endpoint
object based on the RealTimeEnterpriseStreamingEndpoint_v2 class. - When creating the
client
object, pass in the PTv2 host constant. - Uncommented the Main method.
public class EnterpriseStream_v2 {
RealTimeEnterpriseStreamingEndpoint_v2 endpoint = new RealTimeEnterpriseStreamingEndpoint_v2(account, product, label);
// Create a new BasicClient. By default gzip is enabled.
Client client = new ClientBuilder()
.name("PowerTrackClient-01")
.hosts(Constants.ENTERPRISE_STREAM_HOST_v2)
.endpoint(endpoint)
.authentication(auth)
.processor(new LineStringProcessor(queue))
.build();
}
public static void main(String[] args) {
try {
EnterpriseStream_v2.run(args[0], args[1], args[2], args[3], args[4]);
} catch (InterruptedException e) {
System.out.println(e);
}
}
}