This repository has been archived by the owner on Mar 24, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathCloudantReceiver.scala
68 lines (59 loc) · 2.07 KB
/
CloudantReceiver.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.cloudant.spark
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.storage.StorageLevel
import com.cloudant.spark.common._
import play.api.libs.json.Json
import scalaj.http._
class CloudantReceiver(cloudantParams: Map[String, String])
extends Receiver[String](StorageLevel.MEMORY_AND_DISK) {
lazy val config: CloudantConfig = {
JsonStoreConfigManager.getConfig(cloudantParams: Map[String, String]).asInstanceOf[CloudantConfig]
}
def onStart() {
// Start the thread that receives data over a connection
new Thread("Cloudant Receiver") {
override def run() { receive() }
}.start()
}
private def receive(): Unit = {
val url = config.getContinuousChangesUrl()
val selector:String = if (config.getSelector() != null) {
"{\"selector\":" + config.getSelector() + "}"
} else {
"{}"
}
val clRequest: HttpRequest = config.username match {
case null =>
Http(url)
.postData(selector)
.timeout(connTimeoutMs = 1000, readTimeoutMs = 0)
.header("Content-Type", "application/json")
.header("User-Agent", "spark-cloudant")
case _ =>
Http(url)
.postData(selector)
.timeout(connTimeoutMs = 1000, readTimeoutMs = 0)
.header("Content-Type", "application/json")
.header("User-Agent", "spark-cloudant")
.auth(config.username, config.password)
}
clRequest.exec((code, headers, is) => {
if (code == 200) {
scala.io.Source.fromInputStream(is, "utf-8").getLines().foreach(line => {
if (line.length() > 0) {
val json = Json.parse(line)
val jsonDoc = (json \ "doc").get
val doc = Json.stringify(jsonDoc)
store(doc)
}
})
} else {
val status = headers.getOrElse("Status", IndexedSeq.empty)
val errorMsg = "Error retrieving _changes feed for a database " + config.getDbname() + ": " + status(0)
reportError(errorMsg, new RuntimeException(errorMsg))
}
})
}
def onStop() = {
}
}