Skip to content

Data Sources

Daniel Seita edited this page Feb 22, 2016 · 34 revisions

Table of Contents

Matrix Data Sources

A matrix datasource is a class which implements a block iterator over a set of input matrices. The iterator supports next() and hasNext() methods. The input matrices can be dense or sparse. Each call to next() on the datasource returns an array of Mat instance (generic matrices). The length of this array is the same as the number of matrices in the source. For unsupervised learning, the datasource usually holds a single matrix and returns an array of length 1 containing a single matrix on each call to next() (this matrix will contain main instance however). For supervised learning, there is usually a data matrix and a matrix of labels. They should have the same number of columns, since the column index references each input instance. You construct a matrix datasource like this:

> val dopts = new MatSource.Options
> val m = new MatSource(Array(datamat, labelsmat), dopts)

You can adjust the options even after creating the datasource. They are not used until you call init on the datasource.

The options to MatSource are:

   var batchSize = 10000
   var sizeMargin = 3f
   var sample = 1f
   var addConstFeat:Boolean = false
   var featType:Int = 1                 // 0 = binary features, 1 = linear features
   var putBack = -1

The batchSize is the size of the minibatch (number of instances and number of columns) returned by each call to next().

sizeMargin is a parameter that supports caching matrix containers. Since the minibatch size stays the same during a training run, every matrix return by next has the same size in rows x columns. You can save a lot of allocation effort by caching these matrices. But sparse matrices have a variable number of non-zeros. The caching system re-uses a sparse matrix container until it is found to be too small. Then it grows it by this factor. This heuristic works quite well at avoiding future allocations.

sample is a parameter which determines whether the input is sampled (i.e. whether every input instance is used, or a random subset). This parameter specifies the random fraction in the later case. Sampling is done without replacement.

addConstFeat is true adds an extra row with all 1's to each block of the first output matrix when next() is called. This adds a constant term that is useful in many algorithms (regression, collaborative filtering).

featType can be either 0 (binary features) or 1 (linear features). When this flag is 0, all non-zero output values are set to 1's. If this flag is 1, the output values are exactly the same as in the input matrices.

putBack Option

putBack is an integer which specifies the index of the matrix returned by next() which is "put Back" into the datasource on the next call to next(). That is, putBack turns the datasource into a data source/sink. For instance:

> dopts.putBack = 1                // matrix number 1 is the putback matrix
> val m = new MatSource(Array(datamat, predsmat), dopts)
> m.init
> val x = m.next                   // X is an array of two matrix blocks
> val dblock = x(0)
> val pblock = x(1)                // Contents of this matrix will be put back in the DS
> ...                              // Modify pblock
> val y = m.next                   // pblock contents are save into "predmat"

putBack is a scalable approach to prediction and factor models. The size of a matrix of predictions grows with the size of the data source that provides the data. For factor models which approximate a data matrix M as X * Y, the number of columns of Y match the columns of M which can be very large. Its natural to support an update method which pushes those values into the datasource from which data instances are being pulled. For matrix data sources, this simply means that the predictions can be taken from the second matrix argument to the MatSource constructor. For File data sources, that means predictions will be saved to the filesystem*.

  • - putBack to Files DataSources is not yet supported, but coming soon.

Files Data Sources

Files data sources support much larger datasets, up to the capacity of the file system. A files datasource is implemented with a collection of files on disk, with each file holding a matrix of source data or e.g. class labels. FileSouce abstracts the location and size of these files. The size of the matrix blocks returned by next can be smaller or larger than the size of these matrices on disk. Several types of matrix can be streamed at the same time. e.g. a FileSource can front a collection of sparse data and dense label matrices, with one of each returned by next.

To create a FileSource, you need at a miniumum to define the option opts.fnames:List[(Int)=>String] that contains a list of functions mapping integers to names of files. Each element of this list points to matrices of the same type. And the i^th function in this list control the i^th matrix that is returned by next. FileSource includes the options for a general DataSource, plus other parameters which are:

lookahead(4): the number of files to prefetch, and the number of threads to use. 
sampleFiles(1f): randomly sample this fraction of columns.
nstart(0): the starting file index (inclusive).
nend(0): the ending file index (exclusive).
dorows(false): output blocks of rows (if true) or columns.
order(1): randomize the file read order if (1), don't otherwise. 

You can omit nstart and nend in which case the datasource will search for the longest contiguous sequence of (k) files starting with number 0, and assign the results nstart=0, nend=k.

Files data sources work very well with non-RAID disk arrays, and generally give higher throughput. We recorded a throughput of 1.5 GB/s on an array of 16 commodity 2TB disks. You should lower the lookahead count on smaller arrays.

Date Functions

Files DataSources often hold data that is organized by time (e.g. log files, tweets, emails etc). There are convenience functions:

encodeDate(year:Int, month:Int, day:Int, hour:Int)
(year:Int, month:Int, day:Int, hour:Int) = decodeDate(n:Int)

to support sequential access to files organized by date/time. Note that these functions to not attempt to map real calendar dates to consecutive integers, and will have gaps.

SFile Data Sources

The SFiles Data Source extends FileSource with features to support dynamic featurization of sparse input data. SFileSource outputs SMat blocks and the data are stored on disk as SMat files. When using multiple file sequences, SFileSource stacks blocks from those sequences vertically into a single SMat instead of returning separate matrices like FileSource. So e.g., you can combine unigram, bigram and trigram features dynamically in blocks of various sizes with FileSource. SfileSource has the options above for FileSource, plus:

fcounts(null): an IMat containing the sizes of the blocks to stack vertically. 
eltsPerSample: upper bound on the number of non-zeros per input sample. 
               Helps the system size the SMat output arrays. 

Example

This example creates an SFileSource from raw files containing unigram, bigram and trigram data. The sizes (number of rows) for each feature group will be 50,000, 100,000 and 200,000 respectively. The output of next will be SMat blocks with 350,000 rows containing all the above features. Typically featurization using BIDMach sorts features in decreasing order of frequency. So setting the feature counts to k above catches the most frequent k features of each type. This is one of the most common and effective ways to do feature selection. The typical size of a tweet is about 9 words (and similar for the number of bigrams and trigrams per tweet), so the eltsPerSample value is set conservatively at 40.

def twitterNgrams(
  nstart0:Int = FileSource.encodeDate(2012,3,1,0), 
  nend0:Int = FileSource.encodeDate(2012,12,1,0), 
  n:Int = 1, 
  i:Int = 0, 
  nuni:Int = 50, 
  nbi:Int = 100, 
  ntri:Int = 200) = {
    val opts = new SFileSource.Options {  
      fnames = List(
        FileSource.sampleFun(twitterFeatureDir + "unifeats%02d.smat.lz4", n, i),
        FileSource.sampleFun(twitterFeatureDir + "bifeats%02d.smat.lz4", n, i),
        FileSource.sampleFun(twitterFeatureDir + "trifeats%02d.smat.lz4", n, i)
      )
      fcounts = icol(nuni*1000,nbi*1000,ntri*1000)
      nstart = nstart0/n
      nend = nend0/n
      order = 1
      batchSize = 100000
      eltsPerSample = 40
      lookahead = 3
    }
    new SFileSource(opts)
  }

Several more examples of the use of SFileSource are in BIDMach/Experiments.scala

Blended DataSource

A blended DataSource combines streams from two DataSources into a single stream. Its constructor takes two DataStreams as input. It has these options:

bBlock(1000): subBlock size to use for mixing.
afrac(0.5f): fraction of output samples from first input DS 
samp1(1f): sampling rate for first input DS 
samp2(1f): sampling rate for first input DS
The sampling is random, but in blocks of size bBlock. i.e. at each step, BlendedSource choose a block of size bBlock with probability afrac from source 1, else from source 2. The sampling rates for the two streams allows multiple streams to be roughly synchronized. e.g. Evenly sampling (afrac = 0.5) from streams A and B where size(A) ~ 10x size(B) will cause A to be consumed much faster than B and lead to temporal misalignment. Sampling A at 0.1 will restore approximate synchronization.

Example

This example blends a stream of general tweets and a stream of tweets containing emoticons. The first stream is approximately 10x larger, and sampling is used to roughly synchronize them.

def twitterWordBlend(
  nstart:Int = FileSource.encodeDate(2012,3,1,0),
  nend:Int = FileSource.encodeDate(2013,7,1,0),
  n:Int = 1,
  i:Int = 0,
  nfeats:Int = 10000) = {  
  val ds1 = twitterWords(nstart, nend, n, i, nfeats)
  val ds2 = twitterSmileyWords(nstart, nend, n, i, nfeats)
  if (n > 1) {
     ds1.opts.lookahead = 2
     ds2.opts.lookahead = 2
  }
  val opts3 = new BlendedSource.Options
  opts3.afrac = 0.5f
  opts3.samp1 = 0.1f
  opts3.samp2 = 1f
  new BlendedSource(ds1, ds2, opts3)
}