Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding dropTake to RDD. Allows to drop first 'drop' elements and then take next 'take' elements #617

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

trulite
Copy link

@trulite trulite commented May 19, 2013

  1. Added a dropTake(drop: Int, num: Int) method which will drop first 'drop' elements and then take the next 'num' elements.
  2. Added a test for take(num: Int)
  3. Added a test for dropTake(drop: Int, num: Int)

@AmplabJenkins
Copy link

Thank you for your pull request. An admin will review this request soon.

while ((drop - dropped) > 0 && it.hasNext) {
it.next()
dropped += 1
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this will work the way you expect it to - "dropped" is updated in the slave and not 'brought back' : all slaves will see it as '0'.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added another commit. Please take a look if that is okay. I added some comments which can aid in the discussion. Can remove them later if we want to merge.

@mridulm
Copy link
Contributor

mridulm commented May 19, 2013

Since i am not very familiar with usage of accumulator, I will let someone else comment !
(On face of it, it should work unless I am missing some subtlety).

@@ -714,6 +714,42 @@ abstract class RDD[T: ClassManifest](
}

/**
* Drop the first drop elements and then take next num elements of the RDD. This currently scans the partitions *one by one*, so
* it will be slow if a lot of partitions are required. In that case, use dropCollect(drop) to get the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have a dropCollect method; maybe suggest collect().drop(drop) instead?

@JoshRosen
Copy link
Member

I think the current test only exercises one of the branches in

 val taken = if (leftToDrop > 0) it.take(0) else it.take(left)

To cover more cases, I'd add a test where we drop one or more complete partitions before taking results from the remaining partitions.

@trulite
Copy link
Author

trulite commented May 20, 2013

I added these tests
Given 1..16 values with 4 partitions

  1. drop none
  2. drop only from first partition. take 2
  3. drop (4+2) all 4 from first and 2 from second partition. take 2.
  4. drop(4+4+2) all 4 from first and second and 2 from third partition and take 6 values. The take should spill over to the next partition

@trulite
Copy link
Author

trulite commented Jun 23, 2013

Ping!

@mateiz
Copy link
Member

mateiz commented Jun 23, 2013

Sorry for taking a while to get to this. I'm curious, what do you want to use this operation for? It seems potentially limited in use because it's going to scan the whole RDD linearly.

@trulite
Copy link
Author

trulite commented Jun 23, 2013

This provides the LIMIT and TOPK results like in SQL/LINQ. Sometimes it is useful to page to just 2-3 pages(of some page size) of the TOPK results. This will not fetch the whole RDD like collect does.

@mridulm
Copy link
Contributor

mridulm commented Jun 24, 2013

A) Instead of iteratively dropping, it would be better to :

a) Count records per partition
b) Compute records to drop per partition, and records to keep per partition (at master - this per partition dataset is small).
c) Apply a filter per partition based on (b) to generate a new RDD.

Ofcourse, this assumes that :

  1. in normal case, you touch more than one partition.
  2. The amount of data after the dropTake needs to be an RDD - particularly when it is too large to be collected at master.

B) Above will also get rid of the need to do 'collect().drop(drop)' as mentioned in comment : since that will also not scale with size of RDD.

@trulite
Copy link
Author

trulite commented Jun 24, 2013

A)
a) Isn't counting itself a linear operation?

  1. Actually in my use case it is likely that I touch only one partition. But yes this may well be more than one partition.
  2. Usually when doing limit and take, elements to take is usually a small number. This operation is similar to 'take' except we allow the user to skip certain number of pages in the result.

@mridulm
Copy link
Contributor

mridulm commented Jun 24, 2013

  1. Let me explain it this way :

Assuming "T" seconds to process a partition (for simplicity sake, count/drop/etc all take same time - though it is not accurate) - in this PR, if you need to process N partitions, you take N * T seconds.
While in my proposal, you will take max of about 2 * T * P seconds - where P is num_partitions/num_cores : ideally, K should be 1
(Slightly gross simplification for illustration purpose - data locality, etc come into play : hope you get the idea :-) )

Though count is linear - all partitions get scheduled in parallel : so unless you are running on a single core or number of partitions are much higher than number of cores, you wont take num_partitions * time_per_partition.
Ideally, the cost of distributed count == cost of single partitions count : so agnostic to number of partitions for reasonable counts.

Btw, we are not doing a global count - but count per partition.

  1. Specifically for some requirements, I agree - it would be feasible solution : and probably is the ideal solution for small values of offset and count !
    To the point where, I think even a general solution should keep this around to optimize in case the values are small enough (if we estimate that if we touch only a small number of partitions. num_touched <<< num_partitions even if num_touched >> 1).

But as a general construct to RDD, which might get used for all usecases, I am worried about the performance implications - particularly ability to cause OOM and bad scaling - when used on large datasets.

@trulite
Copy link
Author

trulite commented Jun 24, 2013

Thanks! I understand and agree. I will whip up something along these lines soon and update this PR.

@mateiz
Copy link
Member

mateiz commented Jun 24, 2013

If your use case is paging with small values of "drop" and "take", then consider just calling take() on the first few thousand elements and paging through that as a local collection. Otherwise this seems like a very specific thing to add for just one use case.

A parallel slicing operation like Mridul says could also be good, but again I'd like to understand where it will be used, because it seems like a complicated operation to add and to implement well. Just as an example, the counts there might be better off being cached across calls to dropTake if it will be used for paging.

@AmplabJenkins
Copy link

Thank you for your pull request. An admin will review this request soon.

@DomingoExabeam
Copy link

Any movement on this request..? I could use this.

@trulite
Copy link
Author

trulite commented Sep 20, 2013

I was whipping up a generic in tool in which I needed to allow the user to page on top-K elements. Since a generic tool is a one-off use-case I am using my solution(this PR as of the last update). Since this is human user driven, they will only do this for a small number of pages, so the solution I have works better for me than the one Mridul suggested with parallel slicing.

I agree with Mathei that this might be dangerous to have around as a generic RDD construct.

That said, I have experimented with Mridul's approach and might have the code lying around. So if we intend to have this PR pulled I will update the code and test cases.

lucarosellini pushed a commit to Stratio/deep-spark that referenced this pull request Mar 14, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants