This repository has been archived by the owner on Mar 24, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathCloudantDF.py
75 lines (65 loc) · 3.06 KB
/
CloudantDF.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#*******************************************************************************
# Copyright (c) 2015 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#******************************************************************************/
import pprint
from pyspark.sql import SparkSession
# define cloudant related configuration
# set protocol to http if needed, default value=https
# config("cloudant.protocol","http")
spark = SparkSession\
.builder\
.appName("Cloudant Spark SQL Example in Python using dataframes")\
.config("cloudant.host","ACCOUNT.cloudant.com")\
.config("cloudant.username", "USERNAME")\
.config("cloudant.password","PASSWORD")\
.config("jsonstore.rdd.partitions", 8)\
.getOrCreate()
# ***1. Loading dataframe from Cloudant db
df = spark.read.load("n_airportcodemapping", "com.cloudant.spark")
# In case of doing multiple operations on a dataframe (select, filter etc.)
# you should persist the dataframe.
# Othewise, every operation on the dataframe will load the same data from Cloudant again.
# Persisting will also speed up computation.
df.cache() # persisting in memory
# alternatively for large dbs to persist in memory & disk:
# from pyspark import StorageLevel
# df.persist(storageLevel = StorageLevel(True, True, False, True, 1))
df.printSchema()
df.filter(df.airportName >= 'Moscow').select("_id",'airportName').show()
df.filter(df._id >= 'CAA').select("_id",'airportName').show()
# ***2. Saving a datafram to Cloudant db
df = spark.read.load(format="com.cloudant.spark", database="n_flight")
df.printSchema()
df2 = df.filter(df.flightSegmentId=='AA106')\
.select("flightSegmentId", "economyClassBaseCost")
df2.write.save("n_flight2", "com.cloudant.spark",
bulkSize = "100", createDBOnSave="true")
total = df.filter(df.flightSegmentId >'AA9').select("flightSegmentId",
"scheduledDepartureTime").orderBy(df.flightSegmentId).count()
print "Total", total, "flights from table"
# ***3. Loading dataframe from a Cloudant search index
df = spark.read.load(format="com.cloudant.spark", database="n_flight",
index="_design/view/_search/n_flights")
df.printSchema()
total = df.filter(df.flightSegmentId >'AA9').select("flightSegmentId",
"scheduledDepartureTime").orderBy(df.flightSegmentId).count()
print "Total", total, "flights from index"
# ***4. Loading dataframe from a Cloudant view
df = spark.read.load(format="com.cloudant.spark", path="n_flight",
view="_design/view/_view/AA0", schemaSampleSize="20")
# schema for view will always be: _id, key, value
# where value can be a complex field
df.printSchema()
df.show()