diff --git a/Readme.md b/Readme.md index b21aa17..4142166 100644 --- a/Readme.md +++ b/Readme.md @@ -11,21 +11,6 @@ See: * Encourage well-behaved streaming API clients * Operate independently of PHP extensions (ie: shared memory, PCNTL, etc) -In short: - - require_once('Phirehose.php'); - class MyStream extends Phirehose - { - public function enqueueStatus($status) - { - print $status; - } - } - - $stream = new MyStream('username', 'password'); - $stream->consume(); - - ## What this library does do ## * Handles connection/authentication to the twitter streaming API * Consumes the stream handing off each status to be enqueued by a method of your choice @@ -38,6 +23,15 @@ In short: * Provide any sort of inter-process communication * Provide any non-streaming API functionality (ie: user profile info, search, etc) +## How To Use ## + +See the example subdirectory for example usage. In each example file you will need to insert your own oauth token/secret, and the key/secret for the Twitter app you have created. + + * filter-oauth.php shows how to follow certain keywords. + * sample.php shows how to get a small random sample of all public statuses. + * userstream-alternative.php shows how to get user streams. (All activity for one user.) + * sitestream.php shows to how to get site streams. (All activity for multiple users.) + Please see the wiki for [documentation](https://github.com/fennb/phirehose/wiki/Introduction). If you have any additional questions, head over to the Phirehose Users group [http://groups.google.com/group/phirehose-users] diff --git a/example/sitestream.php b/example/sitestream.php new file mode 100644 index 0000000..8fa3574 --- /dev/null +++ b/example/sitestream.php @@ -0,0 +1,62 @@ +array(123,2334,9876)); + * + * Each tweet of your friends looks like: + * [id] => 1011234124121 + * [text] => (the tweet) + * [user] => array( the user who tweeted ) + * [entities] => array ( urls, etc. ) + * + * Every 30 seconds we get the keep-alive message, where $status is empty. + * + * When the user adds a friend we get one of these: + * [event] => follow + * [source] => Array( my user ) + * [created_at] => Tue May 24 13:02:25 +0000 2011 + * [target] => Array (the user now being followed) + * + * @param string $status + */ + public function enqueueStatus($status) + { + /* + * In this simple example, we will just display to STDOUT rather than enqueue. + * NOTE: You should NOT be processing tweets at this point in a real application, instead they + * should be being enqueued and processed asyncronously from the collection process. + */ + $data = json_decode($status, true); + echo date("Y-m-d H:i:s (").strlen($status)."):".print_r($data,true)."\n"; + } + +} + +//These are the application key and secret +//You can create an application, and then get this info, from https://dev.twitter.com/apps +//(They are under OAuth Settings, called "Consumer key" and "Consumer secret") +define('TWITTER_CONSUMER_KEY', 'XXXXXXXXXX'); +define('TWITTER_CONSUMER_SECRET', 'XXXXXXXXXX'); + +//These are the user's token and secret +//You can get this from https://dev.twitter.com/apps, under the "Your access token" +//section for your app. +define('OAUTH_TOKEN', 'XXXXXXXXXX'); +define('OAUTH_SECRET', 'XXXXXXXXXX'); + +// Start streaming +$sc = new MyUserConsumer(OAUTH_TOKEN, OAUTH_SECRET, Phirehose::METHOD_SITE); +$sc->setFollow(array( + 1234, 5678, 901234573 //The user IDs of the twitter accounts to follow. All of + //these users must have given your app permission. + )); +$sc->consume(); diff --git a/example/userstream-alternative.php b/example/userstream-alternative.php new file mode 100644 index 0000000..7ec0fb6 --- /dev/null +++ b/example/userstream-alternative.php @@ -0,0 +1,60 @@ +array(123,2334,9876)); + * + * Each tweet of your friends looks like: + * [id] => 1011234124121 + * [text] => (the tweet) + * [user] => array( the user who tweeted ) + * [entities] => array ( urls, etc. ) + * + * Every 30 seconds we get the keep-alive message, where $status is empty. + * + * When the user adds a friend we get one of these: + * [event] => follow + * [source] => Array( my user ) + * [created_at] => Tue May 24 13:02:25 +0000 2011 + * [target] => Array (the user now being followed) + * + * @param string $status + */ + public function enqueueStatus($status) + { + /* + * In this simple example, we will just display to STDOUT rather than enqueue. + * NOTE: You should NOT be processing tweets at this point in a real application, instead they + * should be being enqueued and processed asyncronously from the collection process. + */ + $data = json_decode($status, true); + echo date("Y-m-d H:i:s (").strlen($status)."):".print_r($data,true)."\n"; + } + +} + +//These are the application key and secret +//You can create an application, and then get this info, from https://dev.twitter.com/apps +//(They are under OAuth Settings, called "Consumer key" and "Consumer secret") +define('TWITTER_CONSUMER_KEY', 'XXXXXXXXXX'); +define('TWITTER_CONSUMER_SECRET', 'XXXXXXXXXX'); + +//These are the user's token and secret +//You can get this from https://dev.twitter.com/apps, under the "Your access token" +//section for your app. +define('OAUTH_TOKEN', 'XXXXXXXXXX'); +define('OAUTH_SECRET', 'XXXXXXXXXX'); + +// Start streaming +$sc = new MyUserConsumer(OAUTH_TOKEN, OAUTH_SECRET, Phirehose::METHOD_USER); +$sc->consume(); diff --git a/example/userstream-simple.php b/example/userstream-simple.php index 4bfe725..36ac2ad 100644 --- a/example/userstream-simple.php +++ b/example/userstream-simple.php @@ -1,5 +1,4 @@ username; if (isset($params['oauth_verifier'])) { @@ -134,36 +141,9 @@ protected function getOAuthHeader($method, $url, $params = array()) return $oauth; } - protected function getAuthorizationHeader() + /** Overrides base class function */ + protected function getAuthorizationHeader($url,$requestParams) { - $url = self::URL_BASE . $this->method . '.' . $this->format; - $urlParts = parse_url($url); - - // Setup params appropriately - $requestParams = array('delimited' => 'length'); - - // Setup the language of the stream - if($this->lang) { - $requestParams['language'] = $this->lang; - } - - // Filter takes additional parameters - if (count($this->trackWords) > 0) - { - $requestParams['track'] = implode(',', $this->trackWords); - } - if (count($this->followIds) > 0) - { - $requestParams['follow'] = implode(',', $this->followIds); - } - if (count($this->locationBoxes) > 0) - { - $requestParams['locations'] = implode(',', $this->locationBoxes); - } - if (count($this->count) <> 0) { - $requestParams['count'] = $this->count; - } - return $this->getOAuthHeader('POST', $url, $requestParams); } } diff --git a/lib/Phirehose.php b/lib/Phirehose.php index de5a09b..f26b16b 100644 --- a/lib/Phirehose.php +++ b/lib/Phirehose.php @@ -6,7 +6,7 @@ * - http://code.google.com/p/phirehose/wiki/Introduction * - http://dev.twitter.com/pages/streaming_api * @author Fenn Bailey - * @version 0.2.gitmaster + * @version 1.0RC */ abstract class Phirehose { @@ -14,7 +14,6 @@ abstract class Phirehose /** * Class constants */ - const URL_BASE = 'https://stream.twitter.com/1.1/statuses/'; const FORMAT_JSON = 'json'; const FORMAT_XML = 'xml'; const METHOD_FILTER = 'filter'; @@ -22,7 +21,15 @@ abstract class Phirehose const METHOD_RETWEET = 'retweet'; const METHOD_FIREHOSE = 'firehose'; const METHOD_LINKS = 'links'; + const METHOD_USER = 'user'; //See UserstreamPhirehose.php + const METHOD_SITE = 'site'; //See UserstreamPhirehose.php + const EARTH_RADIUS_KM = 6371; + + /** + * @internal Moved from being a const to a variable, because some methods (user and site) need to change it. + */ + protected $URL_BASE = 'https://stream.twitter.com/1.1/statuses/'; /** @@ -140,7 +147,7 @@ abstract class Phirehose protected $idleReconnectTimeout = 90; protected $avgPeriod = 60; protected $status_length_base = 10; - protected $userAgent = 'Phirehose/0.2.gitmaster +https://github.com/fennb/phirehose'; + protected $userAgent = 'Phirehose/1.0RC +https://github.com/fennb/phirehose'; protected $filterCheckMin = 5; protected $filterUpdMin = 120; protected $tcpBackoff = 1; @@ -150,15 +157,22 @@ abstract class Phirehose /** * Create a new Phirehose object attached to the appropriate twitter stream method. - * Methods are: METHOD_FIREHOSE, METHOD_RETWEET, METHOD_SAMPLE, METHOD_FILTER, METHOD_LINKS + * Methods are: METHOD_FIREHOSE, METHOD_RETWEET, METHOD_SAMPLE, METHOD_FILTER, METHOD_LINKS, METHOD_USER, METHOD_SITE. Note: the method might cause the use of a different endpoint URL. * Formats are: FORMAT_JSON, FORMAT_XML * @see Phirehose::METHOD_SAMPLE * @see Phirehose::FORMAT_JSON * - * @param string $username Any twitter username - * @param string $password Any twitter password + * @param string $username Any twitter username. When using oAuth, this is the 'oauth_token'. + * @param string $password Any twitter password. When using oAuth this is you oAuth secret. * @param string $method * @param string $format + * + * @todo I've kept the "/2/" at the end of the URL for user streams, as that is what + * was there before AND it works for me! But the official docs say to use /1.1/ + * so that is what I have used for site. + * https://dev.twitter.com/docs/api/1.1/get/user + * + * @todo Shouldn't really hard-code URL strings in this function. */ public function __construct($username, $password, $method = Phirehose::METHOD_SAMPLE, $format = self::FORMAT_JSON, $lang = FALSE) { @@ -167,6 +181,11 @@ public function __construct($username, $password, $method = Phirehose::METHOD_SA $this->method = $method; $this->format = $format; $this->lang = $lang; + switch($method){ + case self::METHOD_USER:$this->URL_BASE = 'https://userstream.twitter.com/2/';break; + case self::METHOD_SITE:$this->URL_BASE = 'https://sitestream.twitter.com/1.1/';break; + default:break; //Stick to the default + } } /** @@ -370,6 +389,10 @@ public function getLang() * Connects to the stream API and consumes the stream. Each status update in the stream will cause a call to the * handleStatus() method. * + * Note: in normal use this function does not return. + * If you pass $reconnect as false, it will still not return in normal use: it will only return + * if the remote side (Twitter) close the socket. (Or the socket dies for some other external reason.) + * * @see handleStatus() * @param boolean $reconnect Reconnects as per recommended * @throws ErrorException @@ -390,7 +413,13 @@ public function consume($reconnect = TRUE) $fdw = $fde = NULL; // Placeholder write/error file descriptors for stream_select // We use a blocking-select with timeout, to allow us to continue processing on idle streams - while ($this->conn !== NULL && !feof($this->conn) && ($numChanged = stream_select($this->fdrPool, $fdw, $fde, $this->readTimeout)) !== FALSE) { + //TODO: there is a bug lurking here. If $this->conn is fine, but $numChanged returns zero, because readTimeout was + // reached, then we should consider we still need to call statusUpdate() every 60 seconds, etc. + // ($this->readTimeout is 5 seconds.) This can be quite annoying. E.g. Been getting data regularly for 55 seconds, + // then it goes quiet for just 10 or so seconds. It is now 65 seconds since last call to statusUpdate() has been + // called, which might mean a monitoring system kills the script assuming it has died. + while ($this->conn !== NULL && !feof($this->conn) && + ($numChanged = stream_select($this->fdrPool, $fdw, $fde, $this->readTimeout)) !== FALSE) { /* Unfortunately, we need to do a safety check for dead twitter streams - This seems to be able to happen where * you end up with a valid connection, but NO tweets coming along the wire (or keep alives). The below guards * against this. @@ -404,48 +433,59 @@ public function consume($reconnect = TRUE) } // Process stream/buffer $this->fdrPool = array($this->conn); // Must reassign for stream_select() - $this->buff .= fread($this->conn, 6); // Small non-blocking to get delimiter text - if (($eol = strpos($this->buff, "\r\n")) === FALSE) { - continue; // We need a newline - } + + //Get a full HTTP chunk. + //NB. This is a tight loop, not using stream_select. + //NB. If that causes problems, then perhaps put something to give up after say trying for 10 seconds? (but + // the stream will be all messed up, so will need to do a reconnect). + $chunk_info=trim(fgets($this->conn)); //First line is hex digits giving us the length + if($chunk_info=='')continue; //Usually indicates a time-out. If we wanted to be sure, + //then stream_get_meta_data($this->conn)['timed_out']==1. (We could instead + // look at the 'eof' member, which appears to be boolean false if just a time-out.) + //TODO: need to consider calling statusUpdate() every 60 seconds, etc. + // Track maximum idle period + // (We got start of an HTTP chunk, this is stream activity) $this->idlePeriod = (time() - $lastStreamActivity); $this->maxIdlePeriod = ($this->idlePeriod > $this->maxIdlePeriod) ? $this->idlePeriod : $this->maxIdlePeriod; - // We got a newline, this is stream activity $lastStreamActivity = time(); - // Read status length delimiter - $delimiter = substr($this->buff, 0, $eol); - $this->buff = substr($this->buff, $eol + 2); // consume off buffer, + 2 = "\r\n" - $statusLength = intval($delimiter, $this->status_length_base); - if ($statusLength > 0) { - // Read status bytes and enqueue - $bytesLeft = $statusLength - strlen($this->buff); - while ( $bytesLeft > 0 - && $this->conn !== NULL - && !feof($this->conn) - && ($numChanged = stream_select($this->fdrPool, $fdw, $fde, 0, 20000)) !== FALSE - && (time() - $lastStreamActivity) <= $this->idleReconnectTimeout) { - $this->fdrPool = array($this->conn); // Reassign - $this->buff .= fread($this->conn, $bytesLeft); // Read until all bytes are read into buffer - $bytesLeft = ($statusLength - strlen($this->buff)); - } - // Accrue/enqueue and track time spent enqueing - $enqueueStart = microtime(TRUE); - $this->enqueueStatus($this->buff); - $this->enqueueSpent += (microtime(TRUE) - $enqueueStart); - $this->statusCount++; - } else { - // Timeout/no data after readTimeout seconds - + + //Append one HTTP chunk to $this->buff + $len=hexdec($chunk_info); //$len includes the \r\n at the end of the chunk (despite what wikipedia says) + //TODO: could do a check for data corruption here. E.g. if($len>100000){...} + $s=''; + $len+=2; //For the \r\n at the end of the chunk + while(!feof($this->conn)){ + $s.=fread($this->conn,$len-strlen($s)); + if(strlen($s)>=$len)break; //TODO: Can never be >$len, only ==$len?? + } + $this->buff.=substr($s,0,-2); //This is our HTTP chunk + + //Process each full tweet inside $this->buff + while(1){ + $eol = strpos($this->buff,"\r\n"); //Find next line ending + if($eol===false)break; //Time to get more data + $enqueueStart = microtime(TRUE); + $this->enqueueStatus(substr($this->buff,0,$eol)); + $this->enqueueSpent += (microtime(TRUE) - $enqueueStart); + $this->statusCount++; + $this->buff = substr($this->buff,$eol+2); //+2 to allow for the \r\n } + + //NOTE: if $this->buff is not empty, it is tempting to go round and get the next HTTP chunk, as + // we know there is data on the incoming stream. However, this could mean the below functions (heartbeat + // and statusUpdate) *never* get called, which would be bad. + // Calc counter averages $this->avgElapsed = time() - $lastAverage; if ($this->avgElapsed >= $this->avgPeriod) { $this->statusRate = round($this->statusCount / $this->avgElapsed, 0); // Calc tweets-per-second // Calc time spent per enqueue in ms - $this->enqueueTimeMS = ($this->statusCount > 0) ? round($this->enqueueSpent / $this->statusCount * 1000, 2) : 0; + $this->enqueueTimeMS = ($this->statusCount > 0) ? + round($this->enqueueSpent / $this->statusCount * 1000, 2) : 0; // Calc time spent total in filter predicate checking - $this->filterCheckTimeMS = ($this->filterCheckCount > 0) ? round($this->filterCheckSpent / $this->filterCheckCount * 1000, 2) : 0; + $this->filterCheckTimeMS = ($this->filterCheckCount > 0) ? + round($this->filterCheckSpent / $this->filterCheckCount * 1000, 2) : 0; $this->heartbeat(); $this->statusUpdate(); @@ -546,11 +586,13 @@ protected function connect() } // Construct URL/HTTP bits - $url = self::URL_BASE . $this->method . '.' . $this->format; + $url = $this->URL_BASE . $this->method . '.' . $this->format; $urlParts = parse_url($url); // Setup params appropriately - $requestParams = array('delimited' => 'length'); + $requestParams=array(); + + //$requestParams['delimited'] = 'length'; //No, we don't want this any more // Setup the language of the stream if($this->lang) { @@ -561,7 +603,8 @@ protected function connect() if ($this->method == self::METHOD_FILTER && count($this->trackWords) > 0) { $requestParams['track'] = implode(',', $this->trackWords); } - if ($this->method == self::METHOD_FILTER && count($this->followIds) > 0) { + if ( ($this->method == self::METHOD_FILTER || $this->method == self::METHOD_SITE) + && count($this->followIds) > 0) { $requestParams['follow'] = implode(',', $this->followIds); } if ($this->method == self::METHOD_FILTER && count($this->locationBoxes) > 0) { @@ -630,40 +673,35 @@ protected function connect() $postData = str_replace('+','%20',$postData); //Change it from RFC1738 to RFC3986 (see //enc_type parameter in http://php.net/http_build_query and note that enc_type is //not available as of php 5.3) - $authCredentials = $this->getAuthorizationHeader(); + $authCredentials = $this->getAuthorizationHeader($url,$requestParams); // Do it - fwrite($this->conn, "POST " . $urlParts['path'] . " HTTP/1.0\r\n"); - fwrite($this->conn, "Host: " . $urlParts['host'] . ':' . $port . "\r\n"); - fwrite($this->conn, "Content-type: application/x-www-form-urlencoded\r\n"); - fwrite($this->conn, "Content-length: " . strlen($postData) . "\r\n"); - fwrite($this->conn, "Accept: */*\r\n"); - fwrite($this->conn, 'Authorization: ' . $authCredentials . "\r\n"); - fwrite($this->conn, 'User-Agent: ' . $this->userAgent . "\r\n"); - fwrite($this->conn, "\r\n"); - fwrite($this->conn, $postData . "\r\n"); - fwrite($this->conn, "\r\n"); + $s = "POST " . $urlParts['path'] . " HTTP/1.1\r\n"; + $s.= "Host: " . $urlParts['host'] . ':' . $port . "\r\n"; + $s .= "Connection: Close\r\n"; + $s.= "Content-type: application/x-www-form-urlencoded\r\n"; + $s.= "Content-length: " . strlen($postData) . "\r\n"; + $s.= "Accept: */*\r\n"; + $s.= 'Authorization: ' . $authCredentials . "\r\n"; + $s.= 'User-Agent: ' . $this->userAgent . "\r\n"; + $s.= "\r\n"; + $s.= $postData . "\r\n"; + $s.= "\r\n"; - $this->log("POST " . $urlParts['path'] . " HTTP/1.0\r\n"); - $this->log("Host: " . $urlParts['host'] . ':' . $port . "\r\n"); - $this->log("Content-type: application/x-www-form-urlencoded\r\n"); - $this->log("Content-length: " . strlen($postData) . "\r\n"); - $this->log("Accept: */*\r\n"); - $this->log('Authorization: ' . $authCredentials . "\r\n"); - $this->log('User-Agent: ' . $this->userAgent . "\r\n"); - $this->log("\r\n"); - $this->log($postData . "\r\n"); - $this->log("\r\n"); + fwrite($this->conn, $s); + $this->log($s); // First line is response list($httpVer, $httpCode, $httpMessage) = preg_split('/\s+/', trim(fgets($this->conn, 1024)), 3); // Response buffers $respHeaders = $respBody = ''; + $isChunking = false; // Consume each header response line until we get to body while ($hLine = trim(fgets($this->conn, 4096))) { - $respHeaders .= $hLine; + $respHeaders .= $hLine."\n"; + if($hLine=='Transfer-Encoding: chunked')$isChunking=true; } // If we got a non-200 response, we need to backoff and retry @@ -671,6 +709,7 @@ protected function connect() $connectFailures++; // Twitter will disconnect on error, but we want to consume the rest of the response body (which is useful) + //TODO: this might be chunked too? In which case this contains some bad characters?? while ($bLine = trim(fgets($this->conn, 4096))) { $respBody .= $bLine; } @@ -696,7 +735,10 @@ protected function connect() continue; } // End if not http 200 - + else{ + if(!$isChunking)throw new Exception("Twitter did not send a chunking header. Is this really HTTP/1.1? Here are headers:\n$respHeaders"); //TODO: rather crude! + } + // Loop until connected OK } while (!is_resource($this->conn) || $httpCode != 200); @@ -717,8 +759,9 @@ protected function connect() } - protected function getAuthorizationHeader() + protected function getAuthorizationHeader($url,$requestParams) { + throw new Exception("Basic auth no longer works with Twitter. You must derive from OauthPhirehose, not directly from the Phirehose class."); $authCredentials = base64_encode($this->username . ':' . $this->password); return "Basic: ".$authCredentials; } diff --git a/lib/UserstreamPhirehose.php b/lib/UserstreamPhirehose.php index 368659c..372af43 100644 --- a/lib/UserstreamPhirehose.php +++ b/lib/UserstreamPhirehose.php @@ -1,496 +1,21 @@ auth_method = $auth_method; - } - - - protected function connect() { - if ($this->auth_method === UserstreamPhirehose::CONNECT_OAUTH) { - $this->connect_oauth(); - } else { - $this->connect_basic(); - } - } - - - /** - * Connects to the stream URL using the configured method. - */ - protected function connect_basic() { - - // Init state - $connectFailures = 0; - $tcpRetry = $this->tcpBackoff / 2; - $httpRetry = $this->httpBackoff / 2; - - // Keep trying until connected (or max connect failures exceeded) - do { - - // Check filter predicates for every connect (for filter method) - if ($this->method == self::METHOD_FILTER) { - $this->checkFilterPredicates(); - } - - // Construct URL/HTTP bits - $url = self::URL_BASE . $this->method . '.' . $this->format; - $urlParts = parse_url($url); - $authCredentials = base64_encode($this->username . ':' . $this->password); - - // Setup params appropriately - $requestParams = array('delimited' => 'length'); - - // Filter takes additional parameters - if ($this->method == self::METHOD_USER && count($this->trackWords) > 0) { - $requestParams['track'] = implode(',', $this->trackWords); - } - if ($this->method == self::METHOD_USER && count($this->followIds) > 0) { - $requestParams['follow'] = implode(',', $this->followIds); - } - - - // Debugging is useful - $this->log('Connecting to twitter stream: ' . $url . ' with params: ' . str_replace("\n", '', - var_export($requestParams, TRUE))); - - /** - * Open socket connection to make POST request. It'd be nice to use stream_context_create with the native - * HTTP transport but it hides/abstracts too many required bits (like HTTP error responses). - */ - $errNo = $errStr = NULL; - $scheme = ($urlParts['scheme'] == 'https') ? 'ssl://' : 'tcp://'; - $port = ($urlParts['scheme'] == 'https') ? 443 : 80; - - /** - * We must perform manual host resolution here as Twitter's IP regularly rotates (ie: DNS TTL of 60 seconds) and - * PHP appears to cache it the result if in a long running process (as per Phirehose). - */ - $streamIPs = gethostbynamel($urlParts['host']); - if (empty($streamIPs)) { - throw new PhirehoseNetworkException("Unable to resolve hostname: '" . $urlParts['host'] . '"'); - } - - // Choose one randomly (if more than one) - $this->log('Resolved host ' . $urlParts['host'] . ' to ' . implode(', ', $streamIPs)); - $streamIP = $streamIPs[rand(0, (count($streamIPs) - 1))]; - $this->log('Connecting to ' . $streamIP); - - @$this->conn = fsockopen($scheme . $streamIP, $port, $errNo, $errStr, $this->connectTimeout); - - // No go - handle errors/backoff - if (!$this->conn || !is_resource($this->conn)) { - $this->lastErrorMsg = $errStr; - $this->lastErrorNo = $errNo; - $connectFailures ++; - if ($connectFailures > $this->connectFailuresMax) { - $msg = 'TCP failure limit exceeded with ' . $connectFailures . ' failures. Last error: ' . $errStr; - $this->log($msg,'error'); - throw new PhirehoseConnectLimitExceeded($msg, $errNo); // Throw an exception for other code to handle - } - // Increase retry/backoff up to max - $tcpRetry = ($tcpRetry < $this->tcpBackoffMax) ? $tcpRetry * 2 : $this->tcpBackoffMax; - $this->log('TCP failure ' . $connectFailures . ' of ' . $this->connectFailuresMax . ' connecting to stream: ' . - $errStr . ' (' . $errNo . '). Sleeping for ' . $tcpRetry . ' seconds.','info'); - sleep($tcpRetry); - continue; - } - - // TCP connect OK, clear last error (if present) - $this->log('Connection established to ' . $streamIP); - $this->lastErrorMsg = NULL; - $this->lastErrorNo = NULL; - - // If we have a socket connection, we can attempt a HTTP request - Ensure blocking read for the moment - stream_set_blocking($this->conn, 1); - - // Encode request data - $postData = http_build_query($requestParams); - - // Do it - fwrite($this->conn, "POST " . $urlParts['path'] . " HTTP/1.0\r\n"); - fwrite($this->conn, "Host: " . $urlParts['host'] . "\r\n"); - fwrite($this->conn, "Content-type: application/x-www-form-urlencoded\r\n"); - fwrite($this->conn, "Content-length: " . strlen($postData) . "\r\n"); - fwrite($this->conn, "Accept: */*\r\n"); - fwrite($this->conn, 'Authorization: Basic ' . $authCredentials . "\r\n"); - fwrite($this->conn, 'User-Agent: ' . self::USER_AGENT . "\r\n"); - fwrite($this->conn, "\r\n"); - fwrite($this->conn, $postData . "\r\n"); - fwrite($this->conn, "\r\n"); - - // First line is response - list($httpVer, $httpCode, $httpMessage) = preg_split('/\s+/', trim(fgets($this->conn, 1024)), 3); - - // Response buffers - $respHeaders = $respBody = ''; - - // Consume each header response line until we get to body - while ($hLine = trim(fgets($this->conn, 4096))) { - $respHeaders .= $hLine; - } - - // If we got a non-200 response, we need to backoff and retry - if ($httpCode != 200) { - $connectFailures ++; - - // Twitter will disconnect on error, but we want to consume the rest of the response body (which is useful) - while ($bLine = trim(fgets($this->conn, 4096))) { - $respBody .= $bLine; - } - - // Construct error - $errStr = 'HTTP ERROR ' . $httpCode . ': ' . $httpMessage . ' (' . $respBody . ')'; - - // Set last error state - $this->lastErrorMsg = $errStr; - $this->lastErrorNo = $httpCode; - - // Have we exceeded maximum failures? - if ($connectFailures > $this->connectFailuresMax) { - $msg = 'Connection failure limit exceeded with ' . $connectFailures . ' failures. Last error: ' . $errStr; - $this->log($msg,'error'); - throw new PhirehoseConnectLimitExceeded($msg, $httpCode); // We eventually throw an exception for other code to handle - } - // Increase retry/backoff up to max - $httpRetry = ($httpRetry < $this->httpBackoffMax) ? $httpRetry * 2 : $this->httpBackoffMax; - $this->log('HTTP failure ' . $connectFailures . ' of ' . $this->connectFailuresMax . ' connecting to stream: ' . - $errStr . '. Sleeping for ' . $httpRetry . ' seconds.','info'); - sleep($httpRetry); - continue; - - } // End if not http 200 - - // Loop until connected OK - } while (!is_resource($this->conn) || $httpCode != 200); - - // Connected OK, reset connect failures - $connectFailures = 0; - $this->lastErrorMsg = NULL; - $this->lastErrorNo = NULL; - - // Switch to non-blocking to consume the stream (important) - stream_set_blocking($this->conn, 0); - - // Connect always causes the filterChanged status to be cleared - $this->filterChanged = FALSE; - - // Flush stream buffer & (re)assign fdrPool (for reconnect) - $this->fdrPool = array($this->conn); - $this->buff = ''; - - } - - - protected function connect_oauth() { - - // Init state - $connectFailures = 0; - $tcpRetry = $this->tcpBackoff / 2; - $httpRetry = $this->httpBackoff / 2; - - // Keep trying until connected (or max connect failures exceeded) - do { - - // Check filter predicates for every connect (for filter method) - if ($this->method == self::METHOD_FILTER) { - $this->checkFilterPredicates(); - } - - // Construct URL/HTTP bits - $url = self::URL_BASE . $this->method . '.' . $this->format; - $urlParts = parse_url($url); - $authCredentials = base64_encode($this->username . ':' . $this->password); - - // Setup params appropriately - $requestParams = array('delimited' => 'length'); - - // Filter takes additional parameters - if ($this->method == self::METHOD_USER && count($this->trackWords) > 0) { - $requestParams['track'] = implode(',', $this->trackWords); - } - if ($this->method == self::METHOD_USER && count($this->followIds) > 0) { - $requestParams['follow'] = implode(',', $this->followIds); - } - - - // Debugging is useful - $this->log('Connecting to twitter stream: ' . $url . ' with params: ' . str_replace("\n", '', - var_export($requestParams, TRUE))); - - /** - * Open socket connection to make POST request. It'd be nice to use stream_context_create with the native - * HTTP transport but it hides/abstracts too many required bits (like HTTP error responses). - */ - $errNo = $errStr = NULL; - $scheme = ($urlParts['scheme'] == 'https') ? 'ssl://' : 'tcp://'; - $port = ($urlParts['scheme'] == 'https') ? 443 : 80; - - /** - * We must perform manual host resolution here as Twitter's IP regularly rotates (ie: DNS TTL of 60 seconds) and - * PHP appears to cache it the result if in a long running process (as per Phirehose). - */ - $streamIPs = gethostbynamel($urlParts['host']); - if (empty($streamIPs)) { - throw new PhirehoseNetworkException("Unable to resolve hostname: '" . $urlParts['host'] . '"'); - } - - // Choose one randomly (if more than one) - $this->log('Resolved host ' . $urlParts['host'] . ' to ' . implode(', ', $streamIPs)); - $streamIP = $streamIPs[rand(0, (count($streamIPs) - 1))]; - $this->log('Connecting to ' . $streamIP); - - @$this->conn = fsockopen($scheme . $streamIP, $port, $errNo, $errStr, $this->connectTimeout); - - // No go - handle errors/backoff - if (!$this->conn || !is_resource($this->conn)) { - $this->lastErrorMsg = $errStr; - $this->lastErrorNo = $errNo; - $connectFailures ++; - if ($connectFailures > $this->connectFailuresMax) { - $msg = 'TCP failure limit exceeded with ' . $connectFailures . ' failures. Last error: ' . $errStr; - $this->log($msg,'error'); - throw new PhirehoseConnectLimitExceeded($msg, $errNo); // Throw an exception for other code to handle - } - // Increase retry/backoff up to max - $tcpRetry = ($tcpRetry < $this->tcpBackoffMax) ? $tcpRetry * 2 : $this->tcpBackoffMax; - $this->log('TCP failure ' . $connectFailures . ' of ' . $this->connectFailuresMax . ' connecting to stream: ' . - $errStr . ' (' . $errNo . '). Sleeping for ' . $tcpRetry . ' seconds.','info'); - sleep($tcpRetry); - continue; - } - - // TCP connect OK, clear last error (if present) - $this->log('Connection established to ' . $streamIP); - $this->lastErrorMsg = NULL; - $this->lastErrorNo = NULL; - - // If we have a socket connection, we can attempt a HTTP request - Ensure blocking read for the moment - stream_set_blocking($this->conn, 1); - - // Encode request data - $postData = http_build_query($requestParams); - - // Oauth tokens - $oauthHeader = $this->getOAuthHeader('POST', $url, $requestParams); - - // Do it - fwrite($this->conn, "POST " . $urlParts['path'] . " HTTP/1.0\r\n"); - fwrite($this->conn, "Host: " . $urlParts['host'].':'.$port . "\r\n"); - fwrite($this->conn, "Content-type: application/x-www-form-urlencoded\r\n"); - fwrite($this->conn, "Content-length: " . strlen($postData) . "\r\n"); - fwrite($this->conn, 'User-Agent: ' . self::USER_AGENT . "\r\n"); - fwrite($this->conn, $oauthHeader."\r\n"); - fwrite($this->conn, "\r\n"); - fwrite($this->conn, $postData . "\r\n"); - fwrite($this->conn, "\r\n"); - - $this->log("POST " . $urlParts['path'] . " HTTP/1.0"); - $this->log("Host: " . $urlParts['host'].':'.$port); - $this->log("Content-type: application/x-www-form-urlencoded"); - $this->log("Content-length: " . strlen($postData)); - $this->log('User-Agent: ' . self::USER_AGENT); - $this->log($oauthHeader); - $this->log(''); - $this->log($postData); - $this->log(''); - - // First line is response - list($httpVer, $httpCode, $httpMessage) = preg_split('/\s+/', trim(fgets($this->conn, 1024)), 3); - - // Response buffers - $respHeaders = $respBody = ''; - - // Consume each header response line until we get to body - while ($hLine = trim(fgets($this->conn, 4096))) { - $respHeaders .= $hLine; - } - - // If we got a non-200 response, we need to backoff and retry - if ($httpCode != 200) { - $connectFailures ++; - - // Twitter will disconnect on error, but we want to consume the rest of the response body (which is useful) - while ($bLine = trim(fgets($this->conn, 4096))) { - $respBody .= $bLine; - } - - // Construct error - $errStr = 'HTTP ERROR ' . $httpCode . ': ' . $httpMessage . ' (' . $respBody . ')'; - - // Set last error state - $this->lastErrorMsg = $errStr; - $this->lastErrorNo = $httpCode; - - // Have we exceeded maximum failures? - if ($connectFailures > $this->connectFailuresMax) { - $msg = 'Connection failure limit exceeded with ' . $connectFailures . ' failures. Last error: ' . $errStr; - $this->log($msg,'error'); - throw new PhirehoseConnectLimitExceeded($msg, $httpCode); // We eventually throw an exception for other code to handle - } - // Increase retry/backoff up to max - $httpRetry = ($httpRetry < $this->httpBackoffMax) ? $httpRetry * 2 : $this->httpBackoffMax; - $this->log('HTTP failure ' . $connectFailures . ' of ' . $this->connectFailuresMax . ' connecting to stream: ' . - $errStr . '. Sleeping for ' . $httpRetry . ' seconds.','info'); - sleep($httpRetry); - continue; - - } // End if not http 200 - - // Loop until connected OK - } while (!is_resource($this->conn) || $httpCode != 200); - - // Connected OK, reset connect failures - $connectFailures = 0; - $this->lastErrorMsg = NULL; - $this->lastErrorNo = NULL; - - // Switch to non-blocking to consume the stream (important) - stream_set_blocking($this->conn, 0); - - // Connect always causes the filterChanged status to be cleared - $this->filterChanged = FALSE; - - // Flush stream buffer & (re)assign fdrPool (for reconnect) - $this->fdrPool = array($this->conn); - $this->buff = ''; - - } - - - protected function prepareParameters($method = null, $url = null, $params = null) { - if(empty($method) || empty($url)) { - return false; - } - - - $oauth['oauth_consumer_key'] = TWITTER_CONSUMER_KEY; - $oauth['oauth_token'] = $this->username; - $oauth['oauth_nonce'] = md5(uniqid(rand(), true)); - $oauth['oauth_timestamp'] = time(); - $oauth['oauth_signature_method'] = 'HMAC-SHA1'; - if(isset($params['oauth_verifier'])) { - $oauth['oauth_verifier'] = $params['oauth_verifier']; - unset($params['oauth_verifier']); - } - $oauth['oauth_version'] = '1.0'; - // encode all oauth values - foreach($oauth as $k => $v) - $oauth[$k] = $this->encode_rfc3986($v); - - // encode all non '@' params - // keep sigParams for signature generation (exclude '@' params) - // rename '@key' to 'key' - $sigParams = array(); - $hasFile = false; - if (is_array($params)) { - foreach($params as $k => $v) { - if(strncmp('@',$k,1) !== 0) { - $sigParams[$k] = $this->encode_rfc3986($v); - $params[$k] = $this->encode_rfc3986($v); - } else { - $params[substr($k, 1)] = $v; - unset($params[$k]); - $hasFile = true; - } - } - - if($hasFile === true) { - $sigParams = array(); - } - - } - - $sigParams = array_merge($oauth, (array)$sigParams); - - // sorting - ksort($sigParams); - - // print_r($sigParams); - - // signing - $oauth['oauth_signature'] = $this->encode_rfc3986($this->generateSignature($method, $url, $sigParams)); - return array('request' => $params, 'oauth' => $oauth); - } - - - protected function encode_rfc3986($string) { - return str_replace('+', ' ', str_replace('%7E', '~', rawurlencode(($string)))); - } - - - protected function generateSignature($method = null, $url = null, $params = null) { - if(empty($method) || empty($url)) { - return false; - } - - - // concatenating and encode - $concat = ''; - foreach((array)$params as $key => $value) { - $concat .= "{$key}={$value}&"; - } - $concat = substr($concat, 0, -1); - $concatenatedParams = $this->encode_rfc3986($concat); - - // normalize url - $urlParts = parse_url($url); - $scheme = strtolower($urlParts['scheme']); - $host = strtolower($urlParts['host']); - $port = isset($urlParts['port']) ? intval($urlParts['port']) : 0; - $retval = strtolower($scheme) . '://' . strtolower($host); - if (!empty($port) && (($scheme === 'http' && $port != 80) || ($scheme === 'https' && $port != 443))) { - $retval .= ":{$port}"; - } - - $retval .= $urlParts['path']; - if (!empty($urlParts['query'])) { - $retval .= "?{$urlParts['query']}"; - } - - $normalizedUrl = $this->encode_rfc3986($retval); - $method = $this->encode_rfc3986($method); // don't need this but why not? - - $signatureBaseString = "{$method}&{$normalizedUrl}&{$concatenatedParams}"; - $this->log('DEBUG: ' . var_export($signatureBaseString, TRUE)); - - # sign the signature string - $key = $this->encode_rfc3986(TWITTER_CONSUMER_SECRET) . '&' . $this->encode_rfc3986($this->password); - return base64_encode(hash_hmac('sha1', $signatureBaseString, $key, true)); - } - - - protected function getOAuthHeader($method, $url, $data) { - $params = $this->prepareParameters($method, $url, $data); - $oauthHeaders = $params['oauth']; - $urlParts = parse_url($url); - $oauth = 'Authorization: OAuth realm="' . $urlParts['scheme'] . '://' . $urlParts['host'] . $urlParts['path'] . '", '; - foreach($oauthHeaders as $name => $value) { - $oauth .= "{$name}=\"{$value}\", "; - } - $oauth = substr($oauth, 0, -2); - return $oauth; - } - +require_once('OauthPhirehose.php'); + +/** +* This class just exists to have a different default method +* (i.e. more for backwards-compatibility than anything) +*/ +abstract class UserstreamPhirehose extends OauthPhirehose { + + /** + * This function just exists to send a different default for $method + * + * @param String $username,$password The oauth token and oauth secret. + * @param String $method Set to self::METHOD_USER for user streams, and to + * self::METHOD_SITE for site streams. + */ + public function __construct($username, $password, $method = self::METHOD_USER, $format = self::FORMAT_JSON, $lang = FALSE) { + parent::__construct($username, $password, $method, $format, $lang); + } + }