diff --git a/kafka/.idea/encodings.xml b/kafka/.idea/encodings.xml new file mode 100644 index 0000000..97626ba --- /dev/null +++ b/kafka/.idea/encodings.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/kafka/.idea/kafka.iml b/kafka/.idea/kafka.iml new file mode 100644 index 0000000..0c73e5a --- /dev/null +++ b/kafka/.idea/kafka.iml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/kafka/.idea/misc.xml b/kafka/.idea/misc.xml new file mode 100644 index 0000000..ec5f9da --- /dev/null +++ b/kafka/.idea/misc.xml @@ -0,0 +1,14 @@ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/kafka/.idea/modules.xml b/kafka/.idea/modules.xml new file mode 100644 index 0000000..fc91a26 --- /dev/null +++ b/kafka/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/kafka/.idea/workspace.xml b/kafka/.idea/workspace.xml new file mode 100644 index 0000000..e0d46c7 --- /dev/null +++ b/kafka/.idea/workspace.xml @@ -0,0 +1,47 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + 1471737191470 + + + + + + + + + + \ No newline at end of file diff --git a/kafka/Vagrantfile b/kafka/Vagrantfile new file mode 100644 index 0000000..68c791e --- /dev/null +++ b/kafka/Vagrantfile @@ -0,0 +1,34 @@ +# -*- mode: ruby -*- +# vi: set ft=ruby : + +Vagrant.configure("2") do |config| + config.vm.box = "dummy" + + config.vm.provider :aws do |aws, override| + aws.secret_access_key = ENV['AWS_SECRET_ACCESS_KEY'] + aws.access_key_id = ENV['AWS_ACCESS_KEY_ID'] + aws.keypair_name = ENV['AWS_KEYPAIR_NAME'] + aws.ami = 'ami-a2490dc2' + aws.region = "us-west-1" + aws.instance_type = 'm3.large' + aws.security_groups = "mongodb" + aws.iam_instance_profile_name = 'mongodb' + + aws.tags = { + "Name" => "Kafka Instance", + } + aws.block_device_mapping = [{ + 'DeviceName' => '/dev/sda1', + 'Ebs.VolumeSize' => 16, + 'Ebs.VolumeType' => 'gp2' }] + + override.ssh.username = "ec2-user" + override.ssh.private_key_path = ENV['PRIVATE_AWS_SSH_KEY_PATH'] + override.nfs.functional = false + end + + config.vm.provision "ansible" do |ansible| + # ansible.verbose = 'vvvv' + ansible.playbook = "kafka.yml" + end +end diff --git a/kafka/ansible.cfg b/kafka/ansible.cfg new file mode 100644 index 0000000..00bfaa3 --- /dev/null +++ b/kafka/ansible.cfg @@ -0,0 +1,6 @@ +[ssh_connection] +control_path = /tmp/%%h-%%p-%%r + +[defaults] +nocows=1 +host_key_checking = False \ No newline at end of file diff --git a/kafka/kafka.yml b/kafka/kafka.yml new file mode 100644 index 0000000..01dd3ad --- /dev/null +++ b/kafka/kafka.yml @@ -0,0 +1,5 @@ +--- +- hosts: all + roles: + - zookeeper + - kafka \ No newline at end of file diff --git a/kafka/roles/kafka/handlers/main.yml b/kafka/roles/kafka/handlers/main.yml new file mode 100644 index 0000000..bd02aec --- /dev/null +++ b/kafka/roles/kafka/handlers/main.yml @@ -0,0 +1,7 @@ +--- +- name: start kafka + sudo: yes + shell: "{{kafka.symlink}}/bin/kafka-server-start.sh {{kafka.symlink}}/config/server.properties &" + args: + chdir: "{{kafka.symlink}}" + register: command_start_kafka \ No newline at end of file diff --git a/kafka/roles/kafka/tasks/main.yml b/kafka/roles/kafka/tasks/main.yml new file mode 100644 index 0000000..02f25b5 --- /dev/null +++ b/kafka/roles/kafka/tasks/main.yml @@ -0,0 +1,21 @@ +--- +- name: Get Kafka {{kafka.version}} + get_url: url={{kafka.url}} dest={{kafka.tgz}} timeout=60 + +- name: Create the remote directory for Kafka + sudo: yes + file: path={{kafka.untar_to}} state=directory mode=0755 + +- name: Unarchive Kafka + sudo: yes + unarchive: copy=no src={{kafka.tgz}} dest={{kafka.untar_to}} + +- name: Symlink to the Kafka version + sudo: yes + file: path={{kafka.symlink}} src={{kafka.symlink_to}} state=link + +- name: Generate the Kafka properties configuration + sudo: yes + template: src=server.properties.j2 dest=/usr/local/lib/kafka/config/server.properties mode=0644 + notify: + - start kafka \ No newline at end of file diff --git a/kafka/roles/kafka/templates/server.properties.j2 b/kafka/roles/kafka/templates/server.properties.j2 new file mode 100644 index 0000000..35e07aa --- /dev/null +++ b/kafka/roles/kafka/templates/server.properties.j2 @@ -0,0 +1,121 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id=0 + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port=9092 + +# Hostname the broker will bind to. If not set, the server will bind to all interfaces +#host.name=localhost + +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +#advertised.host.name= + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port= + +# The number of threads handling network requests +num.network.threads=3 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=102400 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=102400 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma seperated list of directories under which to store log files +log.dirs=/var/log/kafka-logs + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=1 + +# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. +# This value is recommended to be increased for installations with data dirs located in RAID array. +num.recovery.threads.per.data.dir=1 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=1073741824 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=300000 + +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect=localhost:2181 + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=6000 \ No newline at end of file diff --git a/kafka/roles/kafka/vars/main.yml b/kafka/roles/kafka/vars/main.yml new file mode 100644 index 0000000..722e486 --- /dev/null +++ b/kafka/roles/kafka/vars/main.yml @@ -0,0 +1,8 @@ +--- +kafka: + version: 0.10.0.1 + url: http://apache.mirrors.pair.com/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz + tgz: /tmp/kafka_2.11-0.10.0.1.tgz + untar_to: /usr/local/lib + symlink_to: /usr/local/lib/kafka_2.11-0.10.0.1 + symlink: /usr/local/lib/kafka \ No newline at end of file diff --git a/kafka/roles/zookeeper/handlers/main.yml b/kafka/roles/zookeeper/handlers/main.yml new file mode 100644 index 0000000..3e76101 --- /dev/null +++ b/kafka/roles/zookeeper/handlers/main.yml @@ -0,0 +1,16 @@ +--- +- name: stop zookeeper + sudo: yes + command: bin/zkServer.sh stop + args: + chdir: "{{zookeeper.symlink}}" + removes: /var/lib/zookeeper/zookeeper_server.pid + +- name: start zookeeper + sudo: yes + command: bin/zkServer.sh start + args: + chdir: "{{zookeeper.symlink}}" + creates: /var/lib/zookeeper/zookeeper_server.pid + register: zookeeper_start_results + failed_when: "'FAILED' in zookeeper_start_results.stderr" \ No newline at end of file diff --git a/kafka/roles/zookeeper/tasks/main.yml b/kafka/roles/zookeeper/tasks/main.yml new file mode 100644 index 0000000..5da253b --- /dev/null +++ b/kafka/roles/zookeeper/tasks/main.yml @@ -0,0 +1,56 @@ +--- +- name: Update all packages + yum: name=* state=latest + sudo: yes + +- name: download oracle java8 + command: "wget -q -O {{java.java_archive}} --no-cookies --no-check-certificate --header 'Cookie: gpw_e24=http%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie' {{java.java8_download_url}} creates={{java.java_archive}}" + sudo: yes + +- name: upack Java8 archive + sudo: yes + unarchive: + src={{java.java_archive}} + dest={{java.java8_download_folder}} + copy=no + +- name: fix Java8 ownership + sudo: yes + file: + state=directory + path={{java.java_name}} + owner=root + group=root + recurse=yes + +- name: make java available for system + sudo: yes + command: 'alternatives --install "/usr/bin/java" "java" "{{java.java_name}}/bin/java" 2000' + +- name: cleanup + sudo: yes + file: + state=absent + path={{java.java_archive}} + +- name: Download ZooKeeper {{zookeeper.version}} + get_url: url={{zookeeper.url}} dest={{zookeeper.tgz}} timeout=60 + +- name: Create the remote directory for ZooKeeper + sudo: yes + file: path={{zookeeper.untar_to}} state=directory mode=0755 + +- name: Unarchive ZooKeeper + sudo: yes + unarchive: copy=no src={{zookeeper.tgz}} dest={{zookeeper.untar_to}} + +- name: Symlink to the ZooKeeper version + sudo: yes + file: path={{zookeeper.symlink}} src={{zookeeper.symlink_to}} state=link + +- name: Create the ZooKeeper configuration + sudo: yes + template: src=zoo.cfg.j2 dest={{zookeeper.symlink}}/conf/zoo.cfg + notify: + - stop zookeeper + - start zookeeper \ No newline at end of file diff --git a/kafka/roles/zookeeper/templates/zoo.cfg.j2 b/kafka/roles/zookeeper/templates/zoo.cfg.j2 new file mode 100644 index 0000000..c1239c8 --- /dev/null +++ b/kafka/roles/zookeeper/templates/zoo.cfg.j2 @@ -0,0 +1,3 @@ +tickTime=2000 +dataDir=/var/lib/zookeeper +clientPort=2181 \ No newline at end of file diff --git a/kafka/roles/zookeeper/vars/main.yml b/kafka/roles/zookeeper/vars/main.yml new file mode 100644 index 0000000..9c98437 --- /dev/null +++ b/kafka/roles/zookeeper/vars/main.yml @@ -0,0 +1,15 @@ +--- +zookeeper: + version: 3.4.8 + url: http://apache.cs.utah.edu/zookeeper/zookeeper-3.4.8/zookeeper-3.4.8.tar.gz + tgz: /tmp/zookeeper-3.4.8.tar.gz + untar_to: /usr/local/lib/ + symlink_to: /usr/local/lib/zookeeper-3.4.8 + symlink: /usr/local/lib/zookeeper + +java: + java8_download_url: 'http://download.oracle.com/otn-pub/java/jdk/8u102-b14/jdk-8u102-linux-x64.tar.gz' + java8_download_folder: /opt + java_name: '/opt/jdk1.8.0_102' + java_archive: '/opt/jdk-8u102-linux-x64.tar.gz' + diff --git a/mongodb/.idea/encodings.xml b/mongodb/.idea/encodings.xml new file mode 100644 index 0000000..97626ba --- /dev/null +++ b/mongodb/.idea/encodings.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/mongodb/.idea/misc.xml b/mongodb/.idea/misc.xml new file mode 100644 index 0000000..ec5f9da --- /dev/null +++ b/mongodb/.idea/misc.xml @@ -0,0 +1,14 @@ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/mongodb/.idea/modules.xml b/mongodb/.idea/modules.xml new file mode 100644 index 0000000..dc2032d --- /dev/null +++ b/mongodb/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/mongodb/.idea/mongodb.iml b/mongodb/.idea/mongodb.iml new file mode 100644 index 0000000..0c73e5a --- /dev/null +++ b/mongodb/.idea/mongodb.iml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/mongodb/.idea/workspace.xml b/mongodb/.idea/workspace.xml new file mode 100644 index 0000000..aff1b5e --- /dev/null +++ b/mongodb/.idea/workspace.xml @@ -0,0 +1,47 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + 1471737215774 + + + + + + + + + + \ No newline at end of file diff --git a/mongodb/Vagrantfile b/mongodb/Vagrantfile new file mode 100644 index 0000000..494dece --- /dev/null +++ b/mongodb/Vagrantfile @@ -0,0 +1,35 @@ +# -*- mode: ruby -*- +# vi: set ft=ruby : + +Vagrant.configure("2") do |config| + config.vm.box = "dummy" + + config.vm.provider :aws do |aws, override| + aws.secret_access_key = ENV['AWS_SECRET_ACCESS_KEY'] + aws.access_key_id = ENV['AWS_ACCESS_KEY_ID'] + aws.keypair_name = ENV['AWS_KEYPAIR_NAME'] + aws.ami = 'ami-a2490dc2' + aws.region = "us-west-1" + aws.instance_type = 'm3.large' + aws.security_groups = "mongodb" + aws.iam_instance_profile_name = 'mongodb' + + aws.tags = { + "Name" => "MongoDB Instance", + } + aws.block_device_mapping = [{ + 'DeviceName' => '/dev/sda1', + 'Ebs.VolumeSize' => 16, + 'Ebs.VolumeType' => 'gp2', + 'Ebs.DeleteOnTermination' => 'true'}] + + override.ssh.username = "ec2-user" + override.ssh.private_key_path = ENV['PRIVATE_AWS_SSH_KEY_PATH'] + override.nfs.functional = false + end + + config.vm.provision "ansible" do |ansible| + ansible.verbose = 'vvvv' + ansible.playbook = "mongodb.yml" + end + end diff --git a/mongodb/ansible.cfg b/mongodb/ansible.cfg new file mode 100644 index 0000000..00bfaa3 --- /dev/null +++ b/mongodb/ansible.cfg @@ -0,0 +1,6 @@ +[ssh_connection] +control_path = /tmp/%%h-%%p-%%r + +[defaults] +nocows=1 +host_key_checking = False \ No newline at end of file diff --git a/mongodb/mongodb.yml b/mongodb/mongodb.yml new file mode 100644 index 0000000..2fe00fb --- /dev/null +++ b/mongodb/mongodb.yml @@ -0,0 +1,5 @@ +--- +- hosts: all + roles: + - common + - mongod \ No newline at end of file diff --git a/mongodb/roles/common/tasks/main.yml b/mongodb/roles/common/tasks/main.yml new file mode 100644 index 0000000..775e6c9 --- /dev/null +++ b/mongodb/roles/common/tasks/main.yml @@ -0,0 +1,24 @@ +--- +# This Playbook runs all the common plays in the deployment +- name: install 10Gen repo + template: src=10gen.repo.j2 dest=/etc/yum.repos.d/10gen.repo + sudo: yes + +- name: Create the mongod user + user: name=mongod comment="MongoD" + sudo: yes + +- name: Create the data directory for the namenode metadata + file: path={{ mongodb_datadir_prefix }} owner=mongod group=mongod state=directory + sudo: yes + +- name: Install all updates + yum: name=* state=latest + sudo: yes + +- name: Install the mongodb package + sudo: yes + yum: name={{ item }} state=installed + with_items: + - mongo-10gen + - mongo-10gen-server diff --git a/mongodb/roles/common/templates/10gen.repo.j2 b/mongodb/roles/common/templates/10gen.repo.j2 new file mode 100644 index 0000000..d0042cb --- /dev/null +++ b/mongodb/roles/common/templates/10gen.repo.j2 @@ -0,0 +1,5 @@ +[10gen] +name=10gen Repository +baseurl=http://downloads-distro.mongodb.org/repo/redhat/os/x86_64 +gpgcheck=0 +enabled=1 diff --git a/mongodb/roles/common/vars/main.yml b/mongodb/roles/common/vars/main.yml new file mode 100644 index 0000000..7786191 --- /dev/null +++ b/mongodb/roles/common/vars/main.yml @@ -0,0 +1,15 @@ +--- +# The directory prefix where the database files would be stored +mongodb_datadir_prefix: /data/ + +# The interface where the mongodb process should listen on. +# Defaults to the first interface. Change this to: +# +# iface: eth1 +# +# ...to override. +# +iface: '{{ ansible_default_ipv4.interface }}' + +# The password for admin user +mongo_admin_pass: 123456 \ No newline at end of file diff --git a/mongodb/roles/mongod/tasks/main.yml b/mongodb/roles/mongod/tasks/main.yml new file mode 100644 index 0000000..3a2135d --- /dev/null +++ b/mongodb/roles/mongod/tasks/main.yml @@ -0,0 +1,35 @@ +--- +# This role deploys the mongod processes and sets up the replication set. + +- name: create data directory for mongodb + sudo: yes + file: + path={{ mongodb_datadir_prefix }}/mongo + state=directory + owner=mongod + group=mongod + +- name: create log directory for mongodb + sudo: yes + file: + path=/var/log/mongo + state=directory + owner=mongod + group=mongod + +- name: create run directory for mongodb + sudo: yes + file: + path=/var/run/mongo + state=directory + owner=mongod + group=mongod + +- name: Create the mongodb configuration file + sudo: yes + template: src=mongod.conf.j2 dest=/etc/mongod.conf + +- name: Start the mongodb service + sudo: yes + service: name=mongod state=started + diff --git a/mongodb/roles/mongod/templates/mongod.conf.j2 b/mongodb/roles/mongod/templates/mongod.conf.j2 new file mode 100644 index 0000000..bc704aa --- /dev/null +++ b/mongodb/roles/mongod/templates/mongod.conf.j2 @@ -0,0 +1,76 @@ +# mongod.conf + +#where to log +logpath=/var/log/mongodb/mongod.log + +logappend=true + +# fork and run in background +fork=true + +#port=27017 + +dbpath={{ mongodb_datadir_prefix }}mongo + +# location of pidfile +pidfilepath=/var/run/mongodb/mongod.pid + +# Listen to local interface only. Comment out to listen on all interfaces. +bind_ip=0.0.0.0 + +# Disables write-ahead journaling +# nojournal=true + +# Enables periodic logging of CPU utilization and I/O wait +#cpu=true + +# Turn on/off security. Off is currently the default +#noauth=true +#auth=true + +# Verbose logging output. +#verbose=true + +# Inspect all client data for validity on receipt (useful for +# developing drivers) +#objcheck=true + +# Enable db quota management +#quota=true + +# Set oplogging level where n is +# 0=off (default) +# 1=W +# 2=R +# 3=both +# 7=W+some reads +#diaglog=0 + +# Ignore query hints +#nohints=true + +# Enable the HTTP interface (Defaults to port 28017). +#httpinterface=true + +# Turns off server-side scripting. This will result in greatly limited +# functionality +#noscripting=true + +# Turns off table scans. Any query that would do a table scan fails. +#notablescan=true + +# Disable data file preallocation. +#noprealloc=true + +# Specify .ns file size for new databases. +# nssize= + +# Replication Options + +# in replicated mongo databases, specify the replica set name here +#replSet=setname +# maximum size in megabytes for replication operation log +#oplogSize=1024 +# path to a key file storing authentication info for connections +# between replica set members +#keyFile=/path/to/keyfile \ No newline at end of file diff --git a/mongodb/roles/mongod/vars/main.yml b/mongodb/roles/mongod/vars/main.yml new file mode 100644 index 0000000..7786191 --- /dev/null +++ b/mongodb/roles/mongod/vars/main.yml @@ -0,0 +1,15 @@ +--- +# The directory prefix where the database files would be stored +mongodb_datadir_prefix: /data/ + +# The interface where the mongodb process should listen on. +# Defaults to the first interface. Change this to: +# +# iface: eth1 +# +# ...to override. +# +iface: '{{ ansible_default_ipv4.interface }}' + +# The password for admin user +mongo_admin_pass: 123456 \ No newline at end of file diff --git a/rest/.idea/modules.xml b/rest/.idea/modules.xml index 7896e0d..f94aed2 100644 --- a/rest/.idea/modules.xml +++ b/rest/.idea/modules.xml @@ -2,6 +2,8 @@ + + diff --git a/rest/.idea/rest.iml b/rest/.idea/rest.iml index cf813a6..179da26 100644 --- a/rest/.idea/rest.iml +++ b/rest/.idea/rest.iml @@ -7,6 +7,8 @@ + + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -28,58 +61,36 @@ - - - - - - - - - - - - - - - - - - - - - - - + - - + + - + - - + + - - + + - + - - + + - - + + @@ -88,8 +99,8 @@ - - + + @@ -97,16 +108,6 @@ - - - - - - - - - - @@ -117,6 +118,9 @@ + + @@ -144,9 +167,9 @@ DEFINITION_ORDER - @@ -174,8 +197,8 @@ - + @@ -204,7 +227,7 @@ @@ -214,11 +237,21 @@ + + + + + + + project + + + + + @@ -539,29 +614,29 @@ - - + + - - - - - - - - - - - + + + + + + + + + + - - + + - + + @@ -613,17 +688,25 @@ - + - - + + + + + + + + + + - + - + @@ -631,57 +714,150 @@ - + - - + + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - + + + + + + + + + + + + - - + + - + - - + + - + - - - - - + + + - + @@ -689,67 +865,205 @@ - + - - + + - + - - + + + + + + + + + + - + - + - - + + - + - - + + - + - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - + + - + - - + + - + + + + + + + + + \ No newline at end of file diff --git a/twitter/mongoConsumer.py b/twitter/mongoConsumer.py new file mode 100644 index 0000000..4a980b4 --- /dev/null +++ b/twitter/mongoConsumer.py @@ -0,0 +1,48 @@ +#!/usr/bin/python + +import argparse +import json +from kafka import KafkaConsumer +from pymongo import MongoClient + + +parser = argparse.ArgumentParser( + description='Attach to the twitter stream API and send the tweets to a destination', + add_help=True, + formatter_class=argparse.RawDescriptionHelpFormatter) + +parser.add_argument('--kafkahost', + type=str, + required=False, + help='The host name of the Kafka broker if using Kafka serialization') +parser.add_argument('--kafkaport', + type=int, + require=False, + help='The port of the Kafka broker if using Kafka serialization') +parser.add_argument('--mongohost', + type=str, + require=False, + help='The host name of the mongoDB server') +parser.add_argument('--mongoport', + type=str, + require=False, + help='The port number of the mongoDB server') + +args = parser.parse_args() + +mongo_client = MongoClient(host=args.mongohost, port=args.mongoport) + +kafka_server = "{0}:{1}".format(args.kafkahost, args.kafkaport) + +consumer = KafkaConsumer('tweets', + group_id='tweet-group', + value_deserializer=lambda m: json.loads(m.decode('utf-8')), + bootstrap_servers=[kafka_server]) + +for message in consumer: + mongo_client.tweets.tweets.save(message) + + + + + diff --git a/twitter/tweet_serializers/kafkaTweetSerializer.py b/twitter/tweet_serializers/kafkaTweetSerializer.py index 7f2a745..fca76df 100644 --- a/twitter/tweet_serializers/kafkaTweetSerializer.py +++ b/twitter/tweet_serializers/kafkaTweetSerializer.py @@ -1,48 +1,22 @@ -import threading -import logging -import time +from kafka import KafkaProducer +import json -from kafka import KafkaConsumer, KafkaProducer +""" -class Producer(threading.Thread): - daemon = True + Simple unbuffered Kafka Producer - def run(self): - producer = KafkaProducer(bootstrap_servers='localhost:9092') +""" - while True: - producer.send('my-topic', b"test") - producer.send('my-topic', b"\xc2Hola, mundo!") - time.sleep(1) +class KafkaTweetSerializer: + _producer = None -class Consumer(threading.Thread): - daemon = True + def __init__(self, host='localhost', port='9092'): + kafka_server = "{0}:{1}".format(host, str(port)) + self._producer = KafkaProducer(bootstrap_servers=kafka_server, + value_serializer=lambda v: json.dumps(v).encode('utf-8')) - def run(self): - consumer = KafkaConsumer(bootstrap_servers='localhost:9092', - auto_offset_reset='earliest') - consumer.subscribe(['my-topic']) - - for message in consumer: - print (message) - - -def main(): - threads = [ - Producer(), - Consumer() - ] - - for t in threads: - t.start() - - time.sleep(10) - -if __name__ == "__main__": - logging.basicConfig( - format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', - level=logging.INFO - ) - main() \ No newline at end of file + def write(self, message): + self._producer.send(topic='tweets', value=message) + self._producer.flush() diff --git a/twitter/twitter.py b/twitter/twitter.py index 1b1c238..4dcc39f 100644 --- a/twitter/twitter.py +++ b/twitter/twitter.py @@ -3,6 +3,7 @@ from twitter_utils.apikeys import apikeys from twitter_utils.twitterStreamListener import TwitterStreamListener from tweet_serializers.consoleTweetSerializer import ConsoleTweetSerializer +from tweet_serializers.kafkaTweetSerializer import KafkaTweetSerializer import argparse import time @@ -29,15 +30,28 @@ action='store_true', required=False, help='Sample the Twitter stream') +parser.add_argument('--serializer', + choices=['console', 'kafka'], + help='Where to send the tweets.' + ) +parser.add_argument('--kafkahost', + type=str, + required=False, + help='The host name of the Kafka broker if using Kafka serialization') +parser.add_argument('--kafkaport', + type=int, + require=False, + help='The port of the Kafka broker if using Kafka serialization') args = parser.parse_args() - -# fetchSize = 1500 - -print "Writing tweets to console..." -serializer = ConsoleTweetSerializer() -fetchSize = 10 +if args.serializer == 'console': + print "Writing tweets to console..." + serializer = ConsoleTweetSerializer() + fetchSize = 10 +else: + print "Writing tweets to Kafka..." + serializer = KafkaTweetSerializer(host=args.kafkahost, port=args.kafkaport) startTime = time.time()