diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..284630a --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.idea +riak-data-migrator.iml +.DS_Store +target diff --git a/README b/README deleted file mode 100644 index c80788a..0000000 --- a/README +++ /dev/null @@ -1,69 +0,0 @@ -riak-data-migrator - -This tool allows a user to connect to a Riak host or Riak cluster and copy -one or more buckets from Riak to a local disk and then, likewise, load one -or more buckets that have been stored to disk back into Riak. - -Usage: -java -jar riak-data-migrator-0.2.4.jar [options] - -Options: --l Set to Load buckets. Cannot be used with d or k. --d Set to Dump buckets. Cannot be used with l or k. Must specify one or more buckets ---copy Set to Copy buckets to one cluster to another. Cannot be used with d, k, l, k or delete. --k Set to Dump bucket keys. Cannot be used with d or l. Cannot be used with t. --t Transfer bucket properties. Will dump or load bucket properties instead of data. - Cannot be used with k. ---delete Delete bucket data. cannot be used with d, l, k, or t --r Set the path for data to be loaded to or dumped from. - The path must exist and is required. --a Load or Dump all buckets. Cannot be used with delete --b Load or Dump a single bucket. --f Load or Dump a file containing line delimited - bucket names --h Specify Riak host. Required if a cluster host name file is - not specified. --c Specify a file containing line delimited Riak - Cluster Host Names. Required if a host name is not specified. - host name is not specified. --p Specify Riak Protocol Buffers Port. If not specified, defaults to 8087. --H Specify Riak HTTP Port. If not specified, defaults to 8098. ---copyhost Specify destination Riak host for *copy* operation ---copyhostsfile Specify a file containing Cluster Host Names. Req'd - if a single copyhost not specified. ---copypbport Specify destination protocol buffers port, defaults to 8087. --v Output verbose status output to the command line. Default. --s Turn off verbose status. --q Specify the queue size, especially if working with larger object sizes. There are - at most 2 queues for Load/Dump operations. - ---riakworkercount Specify the number of workers used to read from/write - to Riak. ---maxriakconnections Specify the number of connections to maintain - in the Riak connection pool. - -Examples: -Dump all buckets from Riak -java -jar riak-data-migrator-0.2.4.jar -d -r /var/riak_export -a -h 127.0.0.1 -p 8087 \ --H 8098 - -Load all buckets previously dumped back into Riak -java -jar riak-data-migrator-0.2.4jar -l -r /var/riak-export -a -h 127.0.0.1 -p 8087 \ --H 8098 - - -Dump bucket settings from a bucket named "Flights": -java -jar riak-data-migrator-0.2.4.jar -d -t -r /var/riak-export -b Flights -h 127.0.0.1 \ --p 8087 -H 8098 - -Load bucket settings for a bucket named "Flights": -java -jar riak-data-migrator-0.2.4.jar -l -t -r /var/riak-export -b Flights -h 127.0.0.1 \ --p 8087 -H 8098 - -Dump buckets listed in a line delimited file from a Riak cluster -java -jar riak-data-migrator-0.2.4.jar -d -f /home/riakadmin/buckets_to_export.txt -r \ -/var/riak-export -c /home/riakadmin/riak_hosts.txt -p 8087 -H 8098 - -Copy all buckets from one riak host to another: -java -jar riak-data-migrator-0.2.4.jar -copy -r /var/riak_export -a -h 127.0.0.1 -p 8087 \ ---copyhost 192.168.1.100 --copypbport 8087 \ No newline at end of file diff --git a/README.md b/README.md index 4ad16ff..9d244c9 100644 --- a/README.md +++ b/README.md @@ -41,18 +41,21 @@ To transfer data from one Riak cluster to another: on a bucket. 3. Export the contents of a bucket (Riak objects) using the ```-d``` option, to files on disk (the objects will be stored in the binary [ProtoBuf](http://docs.basho.com/riak/latest/references/apis/protocol-buffers/) format) -4. Load the Riak objects from the exported files into the target cluster using the ```-l``` option. +4. (Optional, Search-only) If backing up Search-indexed buckets using Data Migrator versions <= 0.2.6, go into the exported + data directory and delete the internal-use-only indexing buckets (```rm -rf _rsid_*```). See + https://github.com/basho/riak-data-migrator/issues/4 for explanation. +5. Load the Riak objects from the exported files into the target cluster using the ```-l``` option. Downloading: ------------------------ You can download the ready to run jar file at: -http://ps-tools.data.riakcs.net:8080/riak-data-migrator-0.2.4-bin.tar.gz +http://ps-tools.data.riakcs.net:8080/riak-data-migrator-0.2.6-bin.tar.gz After downloading, unzip/untar it, and it's ready to run from its directory. ```bash -tar -xvzf riak-data-migrator-0.2.4-bin.tar.gz -cd riak-data-migrator-0.2.4 -java -jar riak-data-migrator-0.2.4.jar [options] +tar -xvzf riak-data-migrator-0.2.6-bin.tar.gz +cd riak-data-migrator-0.2.6 +java -jar riak-data-migrator-0.2.6.jar [options] ``` Building from source: @@ -67,14 +70,16 @@ mvn package ``` The compiled .jar file is located in the ```target/``` directory. - The usable binary file is ```riak-data-migrator-0.2.4-bin.tar.gz``` + The usable binary file is ```riak-data-migrator-0.2.6-bin.tar.gz``` Usage: ------------------------ Usage: -```java -jar riak-data-migrator-0.2.4.jar [options]``` + +```java -jar riak-data-migrator-0.2.6.jar [options]``` Options: + ``` Data Transfer (required, one of: d, l, k, or delete) -d Export (Dump) the contents bucket(s) (keys and objects), in ProtoBuf format, to files @@ -87,7 +92,7 @@ Settings Transfer (optional, used with to -d or -l) You must also specify -d to export or -l to import, with this option. Delete a bucket ---delete Delete bucket data. Cannot be used with -d, -l, -k, or -t. Must be used with -b or -f +--delete Delete bucket data. Cannot be used with -d, -l, -k, or -t. Must be used with -b or -f Path (required) -r Set the path for data to be loaded to or dumped from (path must be valid) @@ -96,6 +101,8 @@ Bucket Options (required for -d, -k or -t) -a Export all buckets. -b Export a single bucket. -f Export multiple buckets listed in a file (containing line-delimited bucket names) +--loadkeys Specify multiple keys listed in a file (containing line-delimited bucket,key names). Not functional with the -l option. +--bucketkeys Specify keys listed in a line-delimited file in conjunction with the -b option to specify which bucket the keys belong to. Not functional with the -l option. Cluster Addresses and Ports (required) -h Specify Riak hostname. Required if a cluster host name file is not specified. @@ -115,7 +122,7 @@ Concurrency and Misc Settings at most 2 queues for Load/Dump operations. Copy Settings ---copy Set to Copy buckets to one cluster to another. Cannot be used with d, k, l, k or delete. +--copy Set to Copy buckets from one cluster to another. Cannot be used with d, k, l or delete. --copyhost Specify destination Riak host for *copy* operation --copyhostsfile Specify a file containing Cluster Host Names. Req'd if a single copyhost not specified. @@ -127,52 +134,105 @@ Copy Settings Examples: ------------------------- + Dump (the contents of) all buckets from Riak: -```java -jar riak-data-migrator-0.2.4.jar -d -r /var/riak_export -a -h 127.0.0.1 -p 8087 -H 8098``` + +``` +java -jar riak-data-migrator-0.2.6.jar -d -r /var/riak_export -a -h 127.0.0.1 -p 8087 -H 8098 +``` + +Dump a subset of keys from Riak: + +``` +java -jar riak-data-migrator-0.2.6.jar -d -r /var/riak_export --loadkeys bucketKeyNameFile.txt -h 127.0.0.1 -p 8087 -H 8098 +``` Load all buckets previously dumped back into Riak: -```java -jar riak-data-migrator-0.2.4.jar -l -r /var/riak-export -a -h 127.0.0.1 -p 8087 -H 8098``` + +``` +java -jar riak-data-migrator-0.2.6.jar -l -r /var/riak-export -a -h 127.0.0.1 -p 8087 -H 8098 +``` Dump (the contents of) buckets listed in a line delimited file from a Riak cluster: -
-java -jar riak-data-migrator-0.2.4.jar -d -f /home/riakadmin/buckets_to_export.txt -r \  
+
+```
+java -jar riak-data-migrator-0.2.6.jar -d -f /home/riakadmin/buckets_to_export.txt -r \
 /var/riak-export -c /home/riakadmin/riak_hosts.txt -p 8087 -H 8098
-
+``` + +Dump (then contents of) buckets based on the bucket,keys listed in a file, bucket_keys.csv: + +``` +java -jar riak-data-migrator-0.2.6.jar -d --loadkeys bucket_keys.txt -r \ +/var/riak-export -c /home/riakadmin/riak_hosts.txt -p 8087 -H 8098 +``` + +Dump (then contents of) a single bucket, A_BUCKET, based on the keys listed in a file, keys.txt: + +``` +java -jar riak-data-migrator-0.2.6.jar -d -b A_BUCKET --bucketkeys keys.txt -r \ +/var/riak-export -c /home/riakadmin/riak_hosts.txt -p 8087 -H 8098 +``` Export only the bucket settings from a bucket named "Flights": -```java -jar riak-data-migrator-0.2.4.jar -d -t -r /var/riak-export -b Flights -h 127.0.0.1 -p 8087 -H 8098``` + +``` +java -jar riak-data-migrator-0.2.6.jar -d -t -r /var/riak-export -b Flights -h 127.0.0.1 -p 8087 -H 8098 +``` Load bucket settings for a bucket named "Flights": -```java -jar riak-data-migrator-0.2.4.jar -l -t -r /var/riak-export -b Flights -h 127.0.0.1 -p 8087 -H 8098``` + +``` +java -jar riak-data-migrator-0.2.6.jar -l -t -r /var/riak-export -b Flights -h 127.0.0.1 -p 8087 -H 8098 +``` Copy all buckets from one riak host to another: -```java -jar riak-data-migrator-0.2.4.jar -copy -r /var/riak_export -a -h 127.0.0.1 -p 8087 --copyhost 192.168.1.100 --copypbport 8087``` + +``` +java -jar riak-data-migrator-0.2.6.jar -copy -r /var/riak_export -a -h 127.0.0.1 -p 8087 --copyhost 192.168.1.100 --copypbport 8087 +``` + +Copy a single bucket (A_BUCKET) to bucket (B_BUCKET) on the same host + +``` +java -jar riak-data-migrator-0.2.6.jar -copy -r /var/riak_export -b A_BUCKET -h 127.0.0.1 -p 8087 --copyhost 192.168.1.100 --copypbport 8087 --destinationbucket B_BUCKET +``` + Caveats: ------------------------ --This app depends on the key listing operation in the Riak client which + - When backing up + - This app depends on the key listing operation in the Riak client which is slow on a good day. --The Riak memory backend bucket listing operating tends to timeout if + - The Riak memory backend bucket listing operating tends to timeout if any significant amount of data exists. In this case, you have to explicitly specify the buckets you need want to dump using the ```-f``` option to specify a line-delimited list of buckets in a file. + Version Notes: ------------------------ +0.2.6 + - Subset of keys can now be dumped, copied, or deleted. + - In addition to previous capability to specify a file in bucket,key\n format, you can now just have a file with keys + as long as a single bucket is also specified + +0.2.5 + - Added option to dump a subset of keys + 0.2.4 --Verbose status output is now default --Added option to turn off verbose output --Logging of final status + - Verbose status output is now default + - Added option to turn off verbose output + - Logging of final status 0.2.3 --Changed internal message passing between threads from Riak Objects to Events for Dump, Load and Copy operations but not Delete. --Added the capability to transfer data directly between clusters --Added the capability to copy a single bucket into a new bucket for the Load or Copy operations. --Changed log level for retry attempts (but not max retries reached) to warn vs error. + - Changed internal message passing between threads from Riak Objects to Events for Dump, Load and Copy operations but not Delete. + - Added the capability to transfer data directly between clusters + - Added the capability to copy a single bucket into a new bucket for the Load or Copy operations. + - Changed log level for retry attempts (but not max retries reached) to warn vs error. 0.2.2 --Changed message passing for Dump partially to Events --Added logic to count the number of value not founds (ie 404s) when reading --Added summary output for value not founds + - Changed message passing for Dump partially to Events + - Added logic to count the number of value not founds (ie 404s) when reading + - Added summary output for value not founds -< 0.2.1 Ancient History diff --git a/pom.xml b/pom.xml index 64f7426..199ae41 100644 --- a/pom.xml +++ b/pom.xml @@ -86,5 +86,5 @@ 1.0.9 - 0.2.4 + 0.2.6 \ No newline at end of file diff --git a/src/main/java/com/basho/proserv/datamigrator/BucketDelete.java b/src/main/java/com/basho/proserv/datamigrator/BucketDelete.java index 8a39269..9c52bd6 100644 --- a/src/main/java/com/basho/proserv/datamigrator/BucketDelete.java +++ b/src/main/java/com/basho/proserv/datamigrator/BucketDelete.java @@ -2,8 +2,10 @@ import java.io.File; import java.io.IOException; +import java.util.Map; import java.util.Set; +import com.basho.proserv.datamigrator.io.IKeyJournal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,7 +28,28 @@ public BucketDelete(Connection connection, boolean verboseOutput) { this.connection = connection; this.verboseOutput = verboseOutput; } - + + public long deleteKeys(IKeyJournal keyJournal) { + if (!this.connection.connected()) { + log.error("Not connected to Riak"); + return 0; + } + int objectCount = 0; + + File dataRoot = this.getTemporaryPath(true); + if (dataRoot == null) { + return -1; + } + + Map keyJournals = Utilities.splitKeys(dataRoot, keyJournal); + + for (String bucketName : keyJournals.keySet()) { + objectCount += deleteBucket(bucketName, keyJournals.get(bucketName)); + } + + return objectCount; + } + public long deleteBuckets(Set bucketNames) { if (bucketNames == null || bucketNames.size() == 0) { throw new IllegalArgumentException("bucketNames must not be null and must not be sized 0"); @@ -37,11 +60,32 @@ public long deleteBuckets(Set bucketNames) { for (String bucketName : bucketNames) { objectCount += this.deleteBucket(bucketName); } + + this.connection.close(); return objectCount; } - - public long deleteBucket(String bucketName) { + + public long deleteBucket(String bucketName) { + File keyPath = this.getTemporaryPath(false); + if (keyPath == null) { + return -1; + } + + try { + dumpBucketKeys(bucketName, keyPath); + } catch (IOException e) { + log.error("Error listing keys", e); + this.summary.addStatistic(bucketName, -2l, 0l, 0l, 0l); + return -2; + } + + KeyJournal keys = new KeyJournal(keyPath, KeyJournal.Mode.READ); + + return deleteBucket(bucketName, keys); + } + + public long deleteBucket(String bucketName, IKeyJournal keyJournal) { if (bucketName == null || bucketName.isEmpty()) { throw new IllegalArgumentException("bucketName must not be null or empty"); } @@ -51,27 +95,10 @@ public long deleteBucket(String bucketName) { } long objectCount = 0; - File keyPath = null; - try { - keyPath = File.createTempFile("riak-data-migrator", "bucketName"); - keyPath.deleteOnExit(); - } catch (IOException e){ - log.error("Could not create temporary key list file", e); - return -1; - } - - long start = System.currentTimeMillis(); - long keyCount = 0; - try { - keyCount = dumpBucketKeys(bucketName, keyPath); - } catch (IOException e) { - log.error("Error listing keys", e); - this.summary.addStatistic(bucketName, -2l, 0l, 0l, 0l); - return -2; - } - - KeyJournal keys = new KeyJournal(keyPath, KeyJournal.Mode.READ); - AbstractClientDataDeleter deleter = new ThreadedClientDataDeleter(connection, keys); + long start = System.currentTimeMillis(); + + long keyCount = keyJournal.getKeyCount(); + AbstractClientDataDeleter deleter = new ThreadedClientDataDeleter(connection, keyJournal); try { @SuppressWarnings("unused") @@ -96,8 +123,6 @@ public long deleteBucket(String bucketName) { this.printStatus(keyCount, objectCount, true); } - this.connection.close(); - return objectCount; } @@ -112,6 +137,22 @@ public long dumpBucketKeys(String bucketName, File filePath) throws IOException keyJournal.close(); return keyCount; } + + private File getTemporaryPath(boolean directory) { + File keyPath = null; + try { + keyPath = File.createTempFile("riak-data-migrator", "bucketName"); + if (directory) { + keyPath.delete(); + keyPath.mkdir(); + } + keyPath.deleteOnExit(); + } catch (IOException e){ + log.error("Could not create temporary key list file", e); + } + + return keyPath; + } private void printStatus(long keyCount, long objectCount, boolean force) { long end = System.currentTimeMillis(); diff --git a/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java b/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java index cf0b8d0..69349a4 100644 --- a/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java +++ b/src/main/java/com/basho/proserv/datamigrator/BucketDumper.java @@ -2,6 +2,7 @@ import java.io.File; import java.io.IOException; +import java.util.Map; import java.util.Set; import org.slf4j.Logger; @@ -9,6 +10,7 @@ import com.basho.proserv.datamigrator.events.Event; import com.basho.proserv.datamigrator.events.RiakObjectEvent; +import com.basho.proserv.datamigrator.io.IKeyJournal; import com.basho.proserv.datamigrator.io.KeyJournal; import com.basho.proserv.datamigrator.io.RiakObjectBucket; import com.basho.proserv.datamigrator.riak.AbstractClientDataReader; @@ -42,7 +44,7 @@ public BucketDumper(Connection connection, Connection httpConnection, Configurat if (config.getFilePath() == null) { throw new IllegalArgumentException("dataRoot cannot be null"); } - + this.connection = connection; this.httpConnection = httpConnection; this.config = config; @@ -94,13 +96,29 @@ public long dumpBuckets(Set bucketNames, boolean resume, boolean keysOnl } int objectCount = 0; for (String bucketName : bucketNames) { - objectCount += dumpBucket(bucketName, resume, keysOnly); + objectCount += dumpBucket(bucketName, null, resume, keysOnly); } return objectCount; } - + + public long dumpKeys(IKeyJournal keyJournal) { + if (!this.connection.connected()) { + log.error("Not connected to Riak"); + return 0; + } + int objectCount = 0; + + Map keyJournals = Utilities.splitKeys(this.dataRoot, keyJournal); + + for (String bucketName : keyJournals.keySet()) { + objectCount += dumpBucket(bucketName, keyJournals.get(bucketName), false, false); + } + + return objectCount; + } + // resume is unimplemented - public long dumpBucket(String bucketName, boolean resume, boolean keysOnly) { + public long dumpBucket(String bucketName, KeyJournal keys, boolean resume, boolean keysOnly) { if (bucketName == null || bucketName.isEmpty()) { throw new IllegalArgumentException("bucketName cannot be null or empty"); } @@ -127,37 +145,41 @@ public long dumpBucket(String bucketName, boolean resume, boolean keysOnly) { RiakObjectBucket dumpBucket = this.createBucket(bucketName); File keyPath = new File(dumpBucket.getFileRoot().getAbsoluteFile() + "/bucketkeys.keys"); - File dumpedKeyPath = new File(dumpBucket.getFileRoot().getAbsoluteFile() + "/dumpedkeys.keys"); - - long keyCount = 0; - + File dumpedKeyPath = new File(dumpBucket.getFileRoot().getAbsoluteFile() + "/dumpedkeys.keys"); + + IKeyJournal writeDestinationKeyJournal = new KeyJournal(dumpedKeyPath, KeyJournal.Mode.WRITE); + try { - keyCount = this.dumpBucketKeys(bucketName, keyPath); + // If no key subset is specified, get the entire key set + if (keys == null) { + IKeyJournal writeSourceKeyJournal = new KeyJournal(keyPath, KeyJournal.Mode.WRITE); + Iterable listedKeys = this.connection.riakClient.listKeys(bucketName); + writeSourceKeyJournal.populate(bucketName, listedKeys); + writeSourceKeyJournal.close(); + if (this.verboseStatusOutput) { + this.printStatus(writeSourceKeyJournal.getKeyCount(), objectCount, false); + } + if (keysOnly) { + String bucketNameKeys= String.format("%s keys", bucketName); + this.summary.addStatistic(bucketNameKeys, writeSourceKeyJournal.getKeyCount(), System.currentTimeMillis()-start, 0l, 0l); + return writeSourceKeyJournal.getKeyCount(); + } + } } catch (IOException e){ log.error("Error listing keys for bucket " + bucketName, e); this.summary.addStatistic(bucketName, -2l, 0l, 0l, 0l); return 0; } - - if (keysOnly) { - String bucketNameKeys= String.format("%s keys", bucketName); - this.summary.addStatistic(bucketNameKeys, keyCount, System.currentTimeMillis()-start, 0l, 0l); - return keyCount; - } - - KeyJournal bucketKeys = new KeyJournal(keyPath, KeyJournal.Mode.READ); - + + IKeyJournal readSourceKeyJournal = keys == null ? new KeyJournal(keyPath, KeyJournal.Mode.READ) : keys; + // this.saveBucketSettings(bucketName, dumpBucket.getFileRoot()); - KeyJournal keyJournal = new KeyJournal( - dumpedKeyPath, - KeyJournal.Mode.WRITE); - try { // self closing AbstractClientDataReader reader = new ThreadedClientDataReader(connection, - new ClientReaderFactory(), - bucketKeys, + new ClientReaderFactory(), + readSourceKeyJournal, this.config.getRiakWorkerCount(), this.config.getQueueSize()); @@ -167,16 +189,14 @@ public long dumpBucket(String bucketName, boolean resume, boolean keysOnly) { RiakObjectEvent riakEvent = event.asRiakObjectEvent(); dumpBucket.writeRiakObject(riakEvent); objectCount += riakEvent.count(); - keyJournal.write(riakEvent.key()); + writeDestinationKeyJournal.write(riakEvent.key()); } else if (event.isValueErrorEvent()) { // Count not-founds ++valueErrorCount; } else if (event.isIoErrorEvent()) { // Exit on IOException retry reached throw new IOException(event.asIoErrorEvent().ioException()); } - if (this.verboseStatusOutput) { - this.printStatus(keyCount, objectCount, false); - } + } } catch (IOException e) { log.error("Riak error dumping objects for bucket: " + bucketName, e); @@ -185,7 +205,7 @@ public long dumpBucket(String bucketName, boolean resume, boolean keysOnly) { } catch (InterruptedException e) { //no-op } finally { - keyJournal.close(); + writeDestinationKeyJournal.close(); dumpBucket.close(); } @@ -202,7 +222,7 @@ public long dumpBucket(String bucketName, boolean resume, boolean keysOnly) { } if (this.verboseStatusOutput) { - this.printStatus(keyCount, objectCount, true); + this.printStatus(readSourceKeyJournal.getKeyCount(), objectCount, true); } return objectCount; @@ -212,19 +232,7 @@ public long dumpBucket(String bucketName, boolean resume, boolean keysOnly) { public int errorCount() { return errorCount; } - - public long dumpBucketKeys(String bucketName, File filePath) throws IOException { - KeyJournal keyJournal = new KeyJournal(filePath, KeyJournal.Mode.WRITE); - long keyCount = 0; - Iterable keys = this.connection.riakClient.listKeys(bucketName); - for (String keyString : keys) { - keyJournal.write(bucketName, keyString); - ++keyCount; - } - keyJournal.close(); - return keyCount; - } - + private void saveBucketSettings(String bucketName, File path) { File xmlPath = RiakBucketProperties.createBucketSettingsFile(path); RiakBucketProperties riakBucketProps = new RiakBucketProperties(this.httpConnection); @@ -261,7 +269,7 @@ private String createBucketPath(String bucketName) { String encodedBucketName = Utilities.urlEncode(bucketName); return this.dataRoot.getAbsolutePath() + "/" + encodedBucketName; } - + private RiakObjectBucket createBucket(String bucketName) { String bucketRootPath = this.createBucketPath(bucketName); File bucketRoot = new File(bucketRootPath); diff --git a/src/main/java/com/basho/proserv/datamigrator/BucketLoader.java b/src/main/java/com/basho/proserv/datamigrator/BucketLoader.java index 7b677fd..e18f0fd 100644 --- a/src/main/java/com/basho/proserv/datamigrator/BucketLoader.java +++ b/src/main/java/com/basho/proserv/datamigrator/BucketLoader.java @@ -10,6 +10,7 @@ import com.basho.proserv.datamigrator.events.Event; import com.basho.proserv.datamigrator.io.Key; +import com.basho.proserv.datamigrator.io.IKeyJournal; import com.basho.proserv.datamigrator.io.KeyJournal; import com.basho.proserv.datamigrator.io.RiakObjectBucket; import com.basho.proserv.datamigrator.riak.AbstractClientDataWriter; @@ -125,8 +126,8 @@ public long LoadBucket(String bucketName) { new ThreadedClientDataWriter(connection, clientWriterFactory, dumpBucket, this.config.getRiakWorkerCount(), this.config.getQueueSize()); - KeyJournal keyJournal = new KeyJournal( - KeyJournal.createKeyPathFromPath(new File(this.createBucketPath(bucketName, true) + "/keys" ), true), + IKeyJournal keyJournal = new KeyJournal( + KeyJournal.createKeyPathFromPath(new File(this.createBucketPath(bucketName, true) + "/keys" ), true), KeyJournal.Mode.WRITE); try { diff --git a/src/main/java/com/basho/proserv/datamigrator/BucketTransfer.java b/src/main/java/com/basho/proserv/datamigrator/BucketTransfer.java index daeabee..0bf670e 100644 --- a/src/main/java/com/basho/proserv/datamigrator/BucketTransfer.java +++ b/src/main/java/com/basho/proserv/datamigrator/BucketTransfer.java @@ -2,6 +2,7 @@ import java.io.File; import java.io.IOException; +import java.util.Map; import java.util.Set; import org.slf4j.Logger; @@ -9,6 +10,7 @@ import com.basho.proserv.datamigrator.events.Event; import com.basho.proserv.datamigrator.events.RiakObjectEvent; +import com.basho.proserv.datamigrator.io.IKeyJournal; import com.basho.proserv.datamigrator.io.KeyJournal; import com.basho.proserv.datamigrator.riak.ClientReaderFactory; import com.basho.proserv.datamigrator.riak.ClientWriterFactory; @@ -44,7 +46,24 @@ public BucketTransfer(Connection from, this.verboseStatusOutput = this.configuration.getVerboseStatus(); } - + + public long transferKeys(IKeyJournal keyJournal) { + if (!this.fromConnection.connected()) { + log.error("Not connected to Riak"); + return 0; + } + int objectCount = 0; + + File dataRoot = this.getTemporaryPath(true); + Map keyJournals = Utilities.splitKeys(dataRoot, keyJournal); + + for (String bucketName : keyJournals.keySet()) { + objectCount += transferBucket(bucketName, keyJournals.get(bucketName), false); + } + + return objectCount; + } + public long transferAllBuckets(boolean resume) { Set buckets = null; if (this.fromConnection.connected()) { @@ -74,8 +93,27 @@ public long transferBuckets(Set bucketNames, boolean resume) { } return objectCount; } - - public long transferBucket(String bucketName, boolean resume) { + + public long transferBucket(String bucketName, boolean resume) { + File keyPath = getTemporaryPath(false); + if (keyPath == null) { + return -1; + } + + try { + dumpBucketKeys(bucketName, keyPath); + } catch (IOException e) { + log.error("Error listing keys", e); + this.summary.addStatistic(bucketName, -2l, 0l, 0l, 0l); + return -2; + } + + KeyJournal bucketKeys = new KeyJournal(keyPath, KeyJournal.Mode.READ); + + return transferBucket(bucketName, bucketKeys, resume); + } + + public long transferBucket(String bucketName, IKeyJournal keyJournal, boolean resume) { if (bucketName == null || bucketName.isEmpty()) { throw new IllegalArgumentException("bucketName cannot be null or empty"); } @@ -88,28 +126,9 @@ public long transferBucket(String bucketName, boolean resume) { long valueErrorCount = 0; long dataSize = 0; long start = System.currentTimeMillis(); - long keyCount = 0; + long keyCount = keyJournal.getKeyCount(); boolean error = false; - - File keyPath = null; - try { - keyPath = File.createTempFile("riak-data-migrator", bucketName); - keyPath.deleteOnExit(); - } catch (IOException e){ - log.error("Could not create temporary key list file", e); - return -1; - } - - try { - keyCount = dumpBucketKeys(bucketName, keyPath); - } catch (IOException e) { - log.error("Error listing keys", e); - this.summary.addStatistic(bucketName, -2l, 0l, 0l, 0l); - return -2; - } - - KeyJournal bucketKeys = new KeyJournal(keyPath, KeyJournal.Mode.READ); - + ClientWriterFactory clientWriterFactory = new ClientWriterFactory(); clientWriterFactory.setBucketRename(this.configuration.getDestinationBucket()); @@ -117,7 +136,7 @@ public long transferBucket(String bucketName, boolean resume) { // self closing ThreadedClientDataReader reader = new ThreadedClientDataReader(fromConnection, new ClientReaderFactory(), - bucketKeys, + keyJournal, this.configuration.getRiakWorkerCount(), this.configuration.getQueueSize()); @@ -170,7 +189,7 @@ public long transferBucket(String bucketName, boolean resume) { } private long dumpBucketKeys(String bucketName, File filePath) throws IOException { - KeyJournal keyJournal = new KeyJournal(filePath, KeyJournal.Mode.WRITE); + IKeyJournal keyJournal = new KeyJournal(filePath, KeyJournal.Mode.WRITE); long keyCount = 0; Iterable keys = this.fromConnection.riakClient.listKeys(bucketName); for (String keyString : keys) { @@ -180,6 +199,22 @@ private long dumpBucketKeys(String bucketName, File filePath) throws IOException keyJournal.close(); return keyCount; } + + private File getTemporaryPath(boolean directory) { + File keyPath = null; + try { + keyPath = File.createTempFile("riak-data-migrator", "bucketName"); + if (directory) { + keyPath.delete(); + keyPath.mkdir(); + } + keyPath.deleteOnExit(); + } catch (IOException e){ + log.error("Could not create temporary key list file", e); + } + + return keyPath; + } private void printStatus(long keyCount, long objectCount, boolean force) { long end = System.currentTimeMillis(); diff --git a/src/main/java/com/basho/proserv/datamigrator/Configuration.java b/src/main/java/com/basho/proserv/datamigrator/Configuration.java index 5ed3a81..8d1a266 100644 --- a/src/main/java/com/basho/proserv/datamigrator/Configuration.java +++ b/src/main/java/com/basho/proserv/datamigrator/Configuration.java @@ -1,5 +1,7 @@ package com.basho.proserv.datamigrator; +import com.basho.proserv.datamigrator.io.IKeyJournal; + import java.io.File; import java.util.Collection; import java.util.HashSet; @@ -17,11 +19,15 @@ public static enum Mode { LOAD, DUMP }; public static enum Operation { ALL_BUCKETS, BUCKETS, ALL_KEYS, - BUCKET_KEYS, + BUCKET_KEYS, + KEYS, BUCKET_PROPERTIES, DELETE_BUCKETS, + DELETE_KEYS, COPY_ALL, - COPY_BUCKETS }; + COPY_BUCKETS, + COPY_KEYS + }; @@ -39,6 +45,9 @@ public static enum Operation { ALL_BUCKETS, private int queueSize = DEFAULT_QUEUE_SIZE; private Set bucketNames = new HashSet(); +// private Set keyNames = new HashSet(); + + private IKeyJournal keyJournal = null; private boolean verboseStatus = true; private boolean resetVClock = false; @@ -140,6 +149,21 @@ public void addBucketNames(Collection buckets) { public Set getBucketNames() { return this.bucketNames; } + + public void setKeyJournal(IKeyJournal keyJournal) { + this.keyJournal = keyJournal; + } + + public IKeyJournal getKeyJournal() { + return this.keyJournal; + } + +// public void addKeyNames(Collection keys) { +// this.keyNames.addAll(keys); +// } +// public Set getKeyNames() { +// return this.keyNames; +// } public void setVerboseStatus(boolean verboseStatus) { this.verboseStatus = verboseStatus; diff --git a/src/main/java/com/basho/proserv/datamigrator/Main.java b/src/main/java/com/basho/proserv/datamigrator/Main.java index 375c69c..64b8a57 100644 --- a/src/main/java/com/basho/proserv/datamigrator/Main.java +++ b/src/main/java/com/basho/proserv/datamigrator/Main.java @@ -3,6 +3,9 @@ import java.io.File; import java.util.Map; +import com.basho.proserv.datamigrator.io.AbstractKeyJournal; +import com.basho.proserv.datamigrator.io.BucketKeyJournal; +import com.basho.proserv.datamigrator.io.KeyJournal; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; @@ -77,6 +80,23 @@ public static void main(String[] args) { if (cmd.hasOption("copy") && (cmd.hasOption('t') || cmd.hasOption('k'))) { System.out.println("Copy not compatible with t or k options."); } + + if (cmd.hasOption("loadkeys") && cmd.hasOption("bucketkeys")) { + System.out.println("Cannot combine loadkeys and bucketkeys options."); + System.exit(1); + } + +// if ((cmd.hasOption("loadkeys") || cmd.hasOption("bucketkeys")) && !cmd.hasOption("r")) { +// System.out.println("loadkeys and bucketkeys require an output directory (r option)"); +// } + if ((cmd.hasOption("loadkeys") || cmd.hasOption("bucketkeys")) && cmd.hasOption("l")) { + System.out.println("loadkeys and bucketkeys cannot be used with the load (l) option"); + } + + if ((cmd.hasOption("loadkeys") || cmd.hasOption("bucketkeys")) && + !(cmd.hasOption("d") || cmd.hasOption("copy") | cmd.hasOption("delete"))) { + System.out.println("loadkeys and bucketkeys must be used with dump, copy, or delete options (d, copy, delete"); + } Configuration config = handleCommandLine(cmd); @@ -92,17 +112,17 @@ public static void main(String[] args) { runLoader(config); } - if (cmd.hasOption("d") || cmd.hasOption("k") || (cmd.hasOption("d") && cmd.hasOption("t"))) { + if (cmd.hasOption("d") || (cmd.hasOption("d") && cmd.hasOption("t"))) { runDumper(config); } - + } public static Configuration handleCommandLine(CommandLine cmd) { Configuration config = new Configuration(); // Data path - if (!cmd.hasOption("copy")) { + if (!cmd.hasOption("copy") && !cmd.hasOption("delete")) { if (cmd.hasOption("r")) { File dataPath = new File(cmd.getOptionValue("r")); if (!dataPath.exists()) { @@ -167,7 +187,7 @@ public static Configuration handleCommandLine(CommandLine cmd) { System.exit(1); } } else { - System.out.println("Port not specified, using the default: 8087"); + System.out.println("PB Port not specified, using the default: 8087"); } // HTTP Port @@ -183,44 +203,81 @@ public static Configuration handleCommandLine(CommandLine cmd) { } // Destination PB port - if (cmd.hasOption("copypbport")) { + if (cmd.hasOption("copy") && cmd.hasOption("copypbport")) { try { config.setPort(Integer.parseInt(cmd.getOptionValue("copypbport"))); } catch (NumberFormatException e) { System.out.println("Destination PB Port (copypbport) argument is not an integer."); System.exit(1); } - } else { + } else if (cmd.hasOption("copy") && !cmd.hasOption("copypbport")) { System.out.println("Destination PB Port not specified, using the default: 8087"); } // Single bucket specifier if (cmd.hasOption("b")) { - config.addBucketName(cmd.getOptionValue("b")); + config.addBucketName(Utilities.urlDecode(cmd.getOptionValue("b"))); config.setOperation(Configuration.Operation.BUCKETS); } // Bucket filename if (cmd.hasOption("f")) { try { - config.addBucketNames(Utilities.readFileLines(cmd.getOptionValue("f"))); + config.addBucketNames(Utilities.urlDecode(Utilities.readFileLines(cmd.getOptionValue("f")))); config.setOperation(Configuration.Operation.BUCKETS); } catch (Exception e) { System.out.println("Could not read file containing buckets"); System.exit(1); } } + + // Dump from a list of buckets/keys + if (cmd.hasOption("loadkeys")) { + try { + String fileName = cmd.getOptionValue("loadkeys"); + File path = new File(fileName); + + KeyJournal keyJournal = new KeyJournal(path, KeyJournal.Mode.READ); + + config.setKeyJournal(keyJournal); + + config.setOperation(Configuration.Operation.KEYS); + +// config.addKeyNames(Utilities.readFileLines(fileName)); + } catch (Exception e) { + System.out.println("Could not read file containing list of bucket,keys"); + System.exit(1); + } + } + + if (cmd.hasOption("bucketkeys")) { + + if (config.getBucketNames().size() == 1) { + String fileName = cmd.getOptionValue("bucketkeys"); + File path = new File(fileName); + String bucketName = config.getBucketNames().iterator().next(); + BucketKeyJournal keyJournal = new BucketKeyJournal(path, KeyJournal.Mode.READ, bucketName); + + config.setKeyJournal(keyJournal); + + config.setOperation(Configuration.Operation.KEYS); + } else { + System.out.println("bucketkeys only a valid option when specifying a single bucket"); + System.exit(1); + } + } + // Keys only if (cmd.hasOption("k")) { // if keys only.... config.setOperation(Configuration.Operation.BUCKET_KEYS); } // Bucket properties transfer - if (cmd.hasOption("t")) { // if transfer buckets, no compatible with k + if (cmd.hasOption("t")) { // if transfer buckets, not compatible with k config.setOperation(Configuration.Operation.BUCKET_PROPERTIES); } - if (config.getBucketNames().size() == 0 && !cmd.hasOption("a")) { + if (config.getBucketNames().size() == 0 && !cmd.hasOption("a") && !cmd.hasOption("loadkeys")) { System.out.println("No buckets specified to load"); System.exit(1); } @@ -241,13 +298,28 @@ public static Configuration handleCommandLine(CommandLine cmd) { if (config.getBucketNames().size() > 0) { config.setOperation(Configuration.Operation.DELETE_BUCKETS); } + if (cmd.hasOption("bucketkeys") && config.getBucketNames().size() == 1) { + config.setOperation(Configuration.Operation.DELETE_KEYS); + } + if (cmd.hasOption("loadkeys")) { + config.setOperation(Configuration.Operation.DELETE_KEYS); + } } - + + // Subset copy + if (cmd.hasOption("copy") && cmd.hasOption("loadkeys")) { + config.setOperation(Configuration.Operation.COPY_KEYS); + } + + if (cmd.hasOption("copy") && cmd.hasOption("bucketkeys") && config.getBucketNames().size() == 1) { + config.setOperation(Configuration.Operation.COPY_KEYS); + } + // Bucket Copy - if (cmd.hasOption("copy") && config.getBucketNames().size() > 0) { + if (cmd.hasOption("copy") && config.getBucketNames().size() > 0 && !cmd.hasOption("bucketkeys") && !cmd.hasOption("loadkeys")) { config.setOperation(Configuration.Operation.COPY_BUCKETS); } - if (cmd.hasOption("copy") && cmd.hasOption("a")) { + if (cmd.hasOption("copy") && cmd.hasOption("a") && !cmd.hasOption("bucketkeys")) { config.setOperation(Configuration.Operation.COPY_ALL); } @@ -256,7 +328,7 @@ public static Configuration handleCommandLine(CommandLine cmd) { System.out.println("Destination bucket option only valid when specifying a single bucket."); System.exit(1); } - config.setDestinationBucket(cmd.getOptionValue("destinationbucket")); + config.setDestinationBucket(Utilities.urlDecode(cmd.getOptionValue("destinationbucket"))); } if (cmd.hasOption("q")) { @@ -314,7 +386,9 @@ public static void runDelete(Configuration config) { long deleteCount = 0; if (config.getOperation() == Configuration.Operation.DELETE_BUCKETS) { deleteCount = deleter.deleteBuckets(config.getBucketNames()); - } + } else if (config.getOperation() == Configuration.Operation.DELETE_KEYS) { + deleteCount = deleter.deleteKeys(config.getKeyJournal()); + } connection.close(); @@ -396,6 +470,8 @@ public static void runDumper(Configuration config) { dumpCount = dumper.dumpBuckets(config.getBucketNames(), config.getResume(), keysOnly); } else if (config.getOperation() == Configuration.Operation.BUCKET_PROPERTIES) { dumpCount = dumper.dumpBucketSettings(config.getBucketNames()); + } else if (config.getOperation() == Configuration.Operation.KEYS) { + dumpCount = dumper.dumpKeys(config.getKeyJournal()); } else { dumpCount = dumper.dumpAllBuckets(config.getResume(), keysOnly); } @@ -439,7 +515,9 @@ private static void runCopy(Configuration config) { long transferCount = 0; if (config.getOperation() == Configuration.Operation.COPY_BUCKETS) { transferCount = mover.transferBuckets(config.getBucketNames(), false); - } else { + } else if (config.getOperation() == Configuration.Operation.COPY_KEYS) { + transferCount = mover.transferKeys(config.getKeyJournal()); + } else { transferCount = mover.transferAllBuckets(false); } @@ -531,6 +609,8 @@ private static Options createOptions() { options.addOption("a", false, "Load or Dump all buckets"); options.addOption("b", true, "Load or Dump a single bucket"); options.addOption("f", true, "Load or Dump a file containing bucket names"); + options.addOption("loadkeys", true, "Load or Dump a file containing bucket names and keys"); + options.addOption("bucketkeys", true, "Load or Dump a file containing keys. Bucket must be specified"); options.addOption("h", true, "Specify Riak Host"); options.addOption("c", true, "Specify a file containing Riak Cluster Host Names"); options.addOption("p", true, "Specify Riak PB Port"); diff --git a/src/main/java/com/basho/proserv/datamigrator/Utilities.java b/src/main/java/com/basho/proserv/datamigrator/Utilities.java index 3d9c875..292506f 100644 --- a/src/main/java/com/basho/proserv/datamigrator/Utilities.java +++ b/src/main/java/com/basho/proserv/datamigrator/Utilities.java @@ -1,17 +1,48 @@ package com.basho.proserv.datamigrator; +import com.basho.proserv.datamigrator.io.AbstractKeyJournal; +import com.basho.proserv.datamigrator.io.IKeyJournal; +import com.basho.proserv.datamigrator.io.Key; +import com.basho.proserv.datamigrator.io.KeyJournal; + import java.io.BufferedReader; import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; public class Utilities { + public static Map splitKeys(File basePath, IKeyJournal keyJournal) { + Map journals = new HashMap(); + Map readJournals = new HashMap(); + + + try { + for (Key key : keyJournal) { + String bucketName = key.bucket(); + if (!journals.containsKey(bucketName)) { + File bucketPath = new File(basePath.getAbsolutePath() + "/" + Utilities.urlEncode(bucketName)); + bucketPath.mkdir(); + File keyFile = new File(bucketPath.getAbsolutePath() + "/bucketkeys.keys"); + journals.put(key.bucket(), new KeyJournal(keyFile, KeyJournal.Mode.WRITE)); + } + + journals.get(bucketName).write(key); + } + + for (String bucketName: journals.keySet()) { + journals.get(bucketName).close(); + File bucketKeys = new File(basePath.getAbsolutePath() + "/" + Utilities.urlEncode(bucketName) + "/" + "bucketkeys.keys"); + readJournals.put(bucketName, new KeyJournal(bucketKeys, KeyJournal.Mode.READ)); + } + } catch (IOException ex) { + return null; + } + return readJournals; + } + public static List readFileLines(String filename) throws IOException, FileNotFoundException { List lines = new ArrayList(); File file = new File(filename); @@ -50,4 +81,16 @@ public static String urlDecode(String input) { return input; } } + + public static List urlDecode(Iterable lines) { + List decoded = new ArrayList(); + + for (String line : lines) { + decoded.add(urlDecode(line)); + } + + return decoded; + } + + } diff --git a/src/main/java/com/basho/proserv/datamigrator/io/AbstractKeyJournal.java b/src/main/java/com/basho/proserv/datamigrator/io/AbstractKeyJournal.java new file mode 100644 index 0000000..cadb5e6 --- /dev/null +++ b/src/main/java/com/basho/proserv/datamigrator/io/AbstractKeyJournal.java @@ -0,0 +1,214 @@ +package com.basho.proserv.datamigrator.io; + +import com.basho.proserv.datamigrator.Utilities; +import com.basho.riak.client.IRiakObject; +import com.basho.riak.pbc.RiakObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.util.Iterator; + + +public abstract class AbstractKeyJournal implements Iterable, IKeyJournal { + private final Logger log = LoggerFactory.getLogger(AbstractKeyJournal.class); + + public enum Mode { READ, WRITE } + + protected final File path; + protected final Mode mode; + protected final BufferedWriter writer; + protected final BufferedReader reader; + private long keyCount; + private boolean closed = false; + + public AbstractKeyJournal(File path, Mode mode) { + if (path == null) { + throw new IllegalArgumentException("path cannot be null"); + } + this.path = path; + try { + if (mode == Mode.WRITE) { + this.writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(path))); + this.keyCount = 0; + this.reader = null; + } else { + this.reader = new BufferedReader(new InputStreamReader(new FileInputStream(path))); + this.keyCount = -1; + this.writer = null; + } + } catch (FileNotFoundException e) { + throw new IllegalArgumentException("Could not open " + path.getAbsolutePath()); + } + + this.mode = mode; + } + + public void populate(String bucketName, Iterable keys) throws IOException { + for (String keyString : keys) { + this.write(bucketName, keyString); + } + } + + public long getKeyCount() { + if (this.mode == Mode.READ && keyCount == -1) { + try { + this.keyCount = this.countKeys(this.path); + } catch (IOException ex) { + log.error("Could not read key file for counting"); + } + } + return this.keyCount; + } + + @Override + public void write(Key key) throws IOException { + if (key == null) { + throw new IllegalArgumentException("key must not be null"); + } + this.write(key.bucket(), key.key()); + } + + @Override + public void write(String bucket, String key) throws IOException { + if (mode == Mode.READ) { + throw new IllegalArgumentException ("KeyJournal is in READ mode for write operation"); + } + if (bucket == null || key == null) { + throw new IllegalArgumentException("bucket and key must not be null"); + } + this.writer.write((Utilities.urlEncode(bucket) + "," + Utilities.urlEncode(key) + "\n")); + this.keyCount++; + } + + @Override + public void write(RiakObject riakObject) throws IOException { + this.write(riakObject.getBucket(), riakObject.getKey()); + } + + @Override + public void write(IRiakObject riakObject) throws IOException { + this.write(riakObject.getBucket(), riakObject.getKey()); + } + + @Override + public Key read() throws IOException { + if (mode == Mode.WRITE) { + throw new IllegalArgumentException("KeyJournal is in WRITE mode for read operation"); + } + String line = this.reader.readLine(); + if (line == null) { + return null; + } + String[] values = new String[2]; + int comma = line.indexOf(','); + if (comma != -1) { + values[0] = line.substring(0, comma); + values[1] = line.substring(comma + 1, line.length()); + return new Key(Utilities.urlDecode(values[0]), Utilities.urlDecode(values[1])); + } + return null; + } + + public void close() { + try { + if (this.writer != null) { + this.writer.flush(); + this.writer.close(); + } + if (this.reader != null) { + this.reader.close(); + } + } catch (IOException e) { + // no-op, swallow + } + this.closed = true; + } + + public boolean isClosed() { + return this.closed; + } + + public static File createKeyPathFromPath(File file, boolean load) { + String path = file.getAbsolutePath(); + int ind = path.lastIndexOf('.'); + if (ind == -1) { + ind = path.length()-1; + } + path = path.substring(0, ind); + if (load) { + path = path + ".loadedkeys"; + } else { + path = path + ".keys"; + } + return new File(path); + } + + protected long countKeys(File path) throws IOException, FileNotFoundException { + BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path))); + + long count = 0; + while (reader.readLine() != null) { + ++count; + } + + reader.close(); + + return count; + } + + @Override + public Iterator iterator() { + return new KeyIterator(this); + } + + private class KeyIterator implements Iterator { + private final IKeyJournal keyJournal; + + private Key nextKey; + + public KeyIterator(IKeyJournal keyJournal) { + this.keyJournal = keyJournal; + try { + this.nextKey = keyJournal.read(); + } catch (IOException e) { + this.nextKey = null; + } + } + + @Override + public boolean hasNext() { + return this.nextKey != null; + } + + @Override + public Key next() { + Key currentKey = this.nextKey; + try { + this.nextKey = this.keyJournal.read(); + } catch (IOException e) { + this.nextKey = null; + } + if (currentKey == null && this.nextKey == null) { + currentKey = Key.createErrorKey(); + } + return currentKey; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + } + +} + diff --git a/src/main/java/com/basho/proserv/datamigrator/io/BucketKeyJournal.java b/src/main/java/com/basho/proserv/datamigrator/io/BucketKeyJournal.java new file mode 100644 index 0000000..a4a1e6c --- /dev/null +++ b/src/main/java/com/basho/proserv/datamigrator/io/BucketKeyJournal.java @@ -0,0 +1,53 @@ +package com.basho.proserv.datamigrator.io; + +import com.basho.proserv.datamigrator.Utilities; +import com.basho.riak.client.IRiakObject; +import com.basho.riak.pbc.RiakObject; + +import java.io.File; +import java.io.IOException; + +/** + * Created with IntelliJ IDEA. + * User: dankerrigan + * Date: 1/28/14 + * Time: 11:59 AM + * To change this template use File | Settings | File Templates. + */ +public class BucketKeyJournal extends AbstractKeyJournal { + private final String bucketName; + + public BucketKeyJournal(File path, Mode mode, String bucketName) { + super(path, mode); + + this.bucketName = bucketName; + } + + public void write(Key key) throws IOException { + throw new UnsupportedOperationException(); + } + + public void write(String bucket, String key) throws IOException { + throw new UnsupportedOperationException(); + } + + public void write(RiakObject riakObject) throws IOException { + throw new UnsupportedOperationException(); + } + + public void write(IRiakObject riakObject) throws IOException { + throw new UnsupportedOperationException(); + } + + public Key read() throws IOException { + if (this.mode == Mode.WRITE) { + throw new IllegalArgumentException("KeyJournal is in WRITE mode for read operation"); + } + String line = this.reader.readLine(); + if (line == null) { + return null; + } + + return new Key(this.bucketName, Utilities.urlDecode(line)); + } +} diff --git a/src/main/java/com/basho/proserv/datamigrator/io/IKeyJournal.java b/src/main/java/com/basho/proserv/datamigrator/io/IKeyJournal.java new file mode 100644 index 0000000..86bc3ab --- /dev/null +++ b/src/main/java/com/basho/proserv/datamigrator/io/IKeyJournal.java @@ -0,0 +1,34 @@ +package com.basho.proserv.datamigrator.io; + +import com.basho.riak.client.IRiakObject; +import com.basho.riak.pbc.RiakObject; + +import java.io.IOException; +import java.util.Iterator; + +/** + * Created with IntelliJ IDEA. + * User: dankerrigan + * Date: 1/28/14 + * Time: 10:55 AM + * To change this template use File | Settings | File Templates. + */ +public interface IKeyJournal extends Iterable { + void populate(String bucketName, Iterable keys) throws IOException; + + void write(Key key) throws IOException; + + void write(String bucket, String key) throws IOException; + + void write(RiakObject riakObject) throws IOException; + + void write(IRiakObject riakObject) throws IOException; + + Key read() throws IOException; + + void close(); + + long getKeyCount(); + + Iterator iterator(); +} diff --git a/src/main/java/com/basho/proserv/datamigrator/io/KeyJournal.java b/src/main/java/com/basho/proserv/datamigrator/io/KeyJournal.java index ea7cdfe..cc1084f 100644 --- a/src/main/java/com/basho/proserv/datamigrator/io/KeyJournal.java +++ b/src/main/java/com/basho/proserv/datamigrator/io/KeyJournal.java @@ -1,166 +1,18 @@ package com.basho.proserv.datamigrator.io; -import java.io.BufferedReader; -import java.io.BufferedWriter; import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; import java.util.Iterator; -import com.basho.proserv.datamigrator.Utilities; -import com.basho.riak.client.IRiakObject; -import com.basho.riak.pbc.RiakObject; +public class KeyJournal extends AbstractKeyJournal { -public class KeyJournal implements Iterable { - public enum Mode { READ, WRITE } - - private final Mode mode; - private final BufferedWriter writer; - private final BufferedReader reader; + private long writeCount; private boolean closed = false; public KeyJournal(File path, Mode mode) { - if (path == null) { - throw new IllegalArgumentException("path cannot be null"); - } - try { - if (mode == Mode.WRITE) { - this.writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(path))); - this.reader = null; - } else { - this.reader = new BufferedReader(new InputStreamReader(new FileInputStream(path))); - this.writer = null; - } - } catch (FileNotFoundException e) { - throw new IllegalArgumentException("Could not open " + path.getAbsolutePath()); - } - this.mode = mode; - } - - public void write(Key key) throws IOException { - if (key == null) { - throw new IllegalArgumentException("key must not be null"); - } - this.write(key.bucket(), key.key()); - } - - public void write(String bucket, String key) throws IOException { - if (mode == Mode.READ) { - throw new IllegalArgumentException ("KeyJournal is in READ mode for write operation"); - } - if (bucket == null || key == null) { - throw new IllegalArgumentException("bucket and key must not be null"); - } - this.writer.write((Utilities.urlEncode(bucket) + "," + Utilities.urlEncode(key) + "\n")); - } - - public void write(RiakObject riakObject) throws IOException { - this.write(riakObject.getBucket(), riakObject.getKey()); - } - - public void write(IRiakObject riakObject) throws IOException { - this.write(riakObject.getBucket(), riakObject.getKey()); - } - - public Key read() throws IOException { - if (mode == Mode.WRITE) { - throw new IllegalArgumentException("KeyJournal is in WRITE mode for read operation"); - } - String line = this.reader.readLine(); - if (line == null) { - return null; - } - String[] values = new String[2]; - int comma = line.indexOf(','); - if (comma != -1) { - values[0] = line.substring(0, comma); - values[1] = line.substring(comma + 1, line.length()); - return new Key(Utilities.urlDecode(values[0]), Utilities.urlDecode(values[1])); - } - return null; - } - - public void close() { - try { - if (this.writer != null) { - this.writer.flush(); - this.writer.close(); - } - if (this.reader != null) { - this.reader.close(); - } - } catch (IOException e) { - // no-op, swallow - } - this.closed = true; - } - public boolean isClosed() { - return this.closed; - } + super(path, mode); - @Override - public Iterator iterator() { - return new KeyIterator(this); - } - - public static File createKeyPathFromPath(File file, boolean load) { - String path = file.getAbsolutePath(); - int ind = path.lastIndexOf('.'); - if (ind == -1) { - ind = path.length()-1; - } - path = path.substring(0, ind); - if (load) { - path = path + ".loadedkeys"; - } else { - path = path + ".keys"; - } - return new File(path); } - - private class KeyIterator implements Iterator { - private final KeyJournal keyJournal; - - private Key nextKey; - - public KeyIterator(KeyJournal keyJournal) { - this.keyJournal = keyJournal; - try { - this.nextKey = keyJournal.read(); - } catch (IOException e) { - this.nextKey = null; - } - } - - @Override - public boolean hasNext() { - return this.nextKey != null; - } - - @Override - public Key next() { - Key currentKey = this.nextKey; - try { - this.nextKey = this.keyJournal.read(); - } catch (IOException e) { - this.nextKey = null; - } - if (currentKey == null && this.nextKey == null) { - currentKey = Key.createErrorKey(); - } - return currentKey; - } - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - } - } diff --git a/src/test/java/com/basho/proserv/datamigrator/BucketDumperLoaderTests.java b/src/test/java/com/basho/proserv/datamigrator/BucketDumperLoaderTests.java index d8efc8b..67362ba 100644 --- a/src/test/java/com/basho/proserv/datamigrator/BucketDumperLoaderTests.java +++ b/src/test/java/com/basho/proserv/datamigrator/BucketDumperLoaderTests.java @@ -4,6 +4,7 @@ import java.io.File; +import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -12,13 +13,16 @@ import com.basho.riak.client.builders.RiakObjectBuilder; public class BucketDumperLoaderTests { + @Rule + public TemporaryFolder tempFolderMaker = new TemporaryFolder(); + private final int DEFAULT_WORKER_COUNT = Runtime.getRuntime().availableProcessors() * 2; private String host = "127.0.0.1"; - private int port = 18087; - private int httpPort = 18098; + private int port = 8087; + private int httpPort = 8098; File dumpDirectory = null; - + public static String[] bucketNames = {"A","B","C","D","E","F"}; public static String[] keys = new String[100]; @@ -60,8 +64,7 @@ public void testDump() throws Exception { httpConnection.connectHTTPClient(host, httpPort); int count = loadTestData(connection); System.out.println("Loaded " + count + " records"); - - TemporaryFolder tempFolderMaker = new TemporaryFolder(); + dumpDirectory = tempFolderMaker.newFolder(); Configuration config = new Configuration(); diff --git a/src/test/java/com/basho/proserv/riak/datamigrator/io/KeyJournalTests.java b/src/test/java/com/basho/proserv/riak/datamigrator/io/KeyJournalTests.java index 7c335f0..9d3bede 100644 --- a/src/test/java/com/basho/proserv/riak/datamigrator/io/KeyJournalTests.java +++ b/src/test/java/com/basho/proserv/riak/datamigrator/io/KeyJournalTests.java @@ -2,8 +2,12 @@ import static org.junit.Assert.*; +import java.io.BufferedWriter; import java.io.File; +import java.io.FileWriter; +import com.basho.proserv.datamigrator.io.BucketKeyJournal; +import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -12,10 +16,12 @@ public class KeyJournalTests { + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + @Test public void test() throws Exception { int KEY_COUNT = 1000; - TemporaryFolder tempFolder = new TemporaryFolder(); File keyPath = tempFolder.newFile(); @@ -44,14 +50,44 @@ public void test() throws Exception { assertTrue(readCount == buckets.length * keys.length); } + + @Test + public void testBucketKeyJournal() throws Exception { + int KEY_COUNT = 1000; + String TEST_BUCKET = "test_bucket"; + + File keyPath = tempFolder.newFile(); + BufferedWriter writer = new BufferedWriter(new FileWriter(keyPath)); + + String[] keys = new String[KEY_COUNT]; + for (Integer i = 0; i < keys.length; ++i) { + keys[i] = i.toString(); + writer.write(keys[i] + '\n'); + } + + writer.flush(); + writer.close(); + + BucketKeyJournal readJournal = new BucketKeyJournal(keyPath, KeyJournal.Mode.READ, TEST_BUCKET); + + int readCount = 0; + for (Key key : readJournal) { + assertTrue(key.bucket().compareTo(TEST_BUCKET) == 0); + assertTrue(keys[readCount].compareTo(key.key()) == 0); + if (!key.errorKey()) + ++readCount; + } + + assertTrue(readCount == keys.length); + } @Test - public void testCreateKeyPathFromPath() { - File file = new File("/Users/dankerrigan/data.data"); + public void testCreateKeyPathFromPath() throws Exception { + File file = tempFolder.newFile("data.data"); File newPath = KeyJournal.createKeyPathFromPath(file, false); - assertTrue(newPath.getAbsolutePath().compareTo("/Users/dankerrigan/data.keys") == 0); + assertTrue(newPath.getName().compareTo("data.keys") == 0); newPath = KeyJournal.createKeyPathFromPath(file, true); - assertTrue(newPath.getAbsolutePath().compareTo("/Users/dankerrigan/data.loadedkeys") == 0); + assertTrue(newPath.getName().compareTo("data.loadedkeys") == 0); } diff --git a/src/test/java/com/basho/util/UtilitiesTests.java b/src/test/java/com/basho/util/UtilitiesTests.java new file mode 100644 index 0000000..665e876 --- /dev/null +++ b/src/test/java/com/basho/util/UtilitiesTests.java @@ -0,0 +1,81 @@ +package com.basho.util; + +import com.basho.proserv.datamigrator.Utilities; +import com.basho.proserv.datamigrator.io.AbstractKeyJournal; +import com.basho.proserv.datamigrator.io.IKeyJournal; +import com.basho.proserv.datamigrator.io.Key; +import com.basho.proserv.datamigrator.io.KeyJournal; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; + +/** + * Created with IntelliJ IDEA. + * User: dankerrigan + * Date: 1/29/14 + * Time: 9:01 AM + * To change this template use File | Settings | File Templates. + */ +public class UtilitiesTests { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + + @Test + public void testSplitKeys() throws Exception { + int KEY_COUNT = 10; + File keyPath = tempFolder.newFile("data.keys"); + KeyJournal writeKeyJournal = new KeyJournal(keyPath, KeyJournal.Mode.WRITE); + + String[] buckets = new String[] {"A", "B", "C", "D", "E", "F"}; + for (String bucket : buckets) { + for (Integer i = 0; i < KEY_COUNT; ++i) { + writeKeyJournal.write(new Key(bucket, i.toString())); + } + } + + writeKeyJournal.close(); + + KeyJournal readKeyJournal = new KeyJournal(keyPath, KeyJournal.Mode.READ); + + File splitFolder = tempFolder.newFolder(); + + Map readJournals = Utilities.splitKeys(splitFolder, readKeyJournal); + + String[] splitDirs = splitFolder.list(); + + int actualBucketCount = splitDirs.length; + + for (String splitDir : splitDirs) { + File newBucketKeyPath = new File(splitFolder.getAbsolutePath() + "/" + splitDir + "/" + "bucketkeys.keys"); + + KeyJournal newJournal = new KeyJournal(newBucketKeyPath, KeyJournal.Mode.READ); + + Iterator iter = newJournal.iterator(); + for (Integer i = 0; i < KEY_COUNT; ++i) { + assertTrue(i.toString().compareTo(iter.next().key()) == 0); + } + } + + assertTrue(actualBucketCount * KEY_COUNT== (buckets.length * KEY_COUNT)); + + + for (String bucketName : readJournals.keySet()) { + Set keys = new HashSet(); + for (Key key : readJournals.get(bucketName)) { + keys.add(key.key()); + } + assertEquals(keys.size(), KEY_COUNT); + } + } +}