diff --git a/Gemfile b/Gemfile index e8826424b4..6785cc2a8e 100644 --- a/Gemfile +++ b/Gemfile @@ -182,10 +182,6 @@ gem 'oj_mimic_json' # CORS for local testing/dev gem 'rack-cors' -# Salesforce streaming API -gem 'cookiejar', git: 'https://github.com/MissionCapital/cookiejar.git' -gem 'faye', '0.8.9' - gem 'blazer' group :development, :test do diff --git a/Gemfile.lock b/Gemfile.lock index 132ca4a1a8..76fe41bc61 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,9 +1,3 @@ -GIT - remote: https://github.com/MissionCapital/cookiejar.git - revision: 68eecc8415338c6e9d636c4f7e4d2e13547e20a2 - specs: - cookiejar (0.3.2) - GIT remote: https://github.com/openstax/intl-tel-input-rails.git revision: 3a63a495822d90bc9ca93e9541ef252fd7a0b50b @@ -252,14 +246,6 @@ GEM dry-equalizer (~> 0.2) dry-logic (~> 0.4.2) dry-types (~> 0.13.1) - em-http-request (1.1.7) - addressable (>= 2.3.4) - cookiejar (!= 0.3.1) - em-socksify (>= 0.3) - eventmachine (>= 1.0.3) - http_parser.rb (>= 0.6.0) - em-socksify (0.3.2) - eventmachine (>= 1.0.0.beta.4) em-websocket (0.5.1) eventmachine (>= 0.12.9) http_parser.rb (~> 0.6.0) @@ -286,16 +272,6 @@ GEM faraday (>= 0.8) faraday_middleware (1.0.0) faraday (~> 1.0) - faye (0.8.9) - cookiejar (>= 0.3.0) - em-http-request (>= 0.3.0) - eventmachine (>= 0.12.0) - faye-websocket (>= 0.4.0) - rack (>= 1.0.0) - yajl-ruby (>= 1.0.0) - faye-websocket (0.11.1) - eventmachine (>= 0.12.0) - websocket-driver (>= 0.5.1) ffi (1.15.4) fine_print (6.0.1) action_interceptor @@ -761,7 +737,6 @@ GEM will_paginate (3.3.0) xpath (3.2.0) nokogiri (~> 1.8) - yajl-ruby (1.4.2) PLATFORMS ruby @@ -790,7 +765,6 @@ DEPENDENCIES codecov coffee-rails (= 5.0.0) compass-rails (~> 3.1.0) - cookiejar! database_cleaner db-query-matchers debase @@ -806,7 +780,6 @@ DEPENDENCIES fakeredis faraday (~> 1.0.0) faraday_middleware (~> 1.0.0) - faye (= 0.8.9) fine_print font-awesome-rails guard-livereload (~> 2.5) diff --git a/app/models/push_topic.rb b/app/models/push_topic.rb deleted file mode 100644 index 4425d94174..0000000000 --- a/app/models/push_topic.rb +++ /dev/null @@ -1,2 +0,0 @@ -class PushTopic < ApplicationRecord -end diff --git a/app/models/salesforce_streaming_replay.rb b/app/models/salesforce_streaming_replay.rb deleted file mode 100644 index f1b28482a0..0000000000 --- a/app/models/salesforce_streaming_replay.rb +++ /dev/null @@ -1,2 +0,0 @@ -class SalesforceStreamingReplay < ApplicationRecord -end diff --git a/app/services/contact_parser.rb b/app/services/contact_parser.rb deleted file mode 100644 index 8834bc6edf..0000000000 --- a/app/services/contact_parser.rb +++ /dev/null @@ -1,99 +0,0 @@ -class ContactParser - - def initialize(event) - @event = event - end - - def save_contact - contact_params = sanitize_contact - ci_table = ContactInfo.arel_table - - user = User.find_by(uuid: contact_params[:accounts_uuid].to_s) - if !user.present? - user = User.joins(:contact_infos).eager_load(:contact_infos).where(ci_table[:value].lower.eq(contact_params[:email])).first - # TODO: lookup using all emails - end - - if user.present? - school = School.select(:id, :salesforce_id, :location, :type).where( - salesforce_id: contact_params[:sf_id] - ).index_by(&:salesforce_id) - - if school.present? - user.school = school.id - user.school_type = case school&.type - when *COLLEGE_TYPES - :college - when *HIGH_SCHOOL_TYPES - :high_school - when *K12_TYPES - :k12_school - when *HOME_SCHOOL_TYPES - :home_school - when NilClass - :unknown_school_type - else - :other_school_type - end - - user.school_location = case school&.location - when *DOMESTIC_SCHOOL_LOCATIONS - :domestic_school - when *FOREIGN_SCHOOL_LOCATIONS - :foreign_school - else - :unknown_school_location - end - - user.is_kip = school&.is_kip || school&.is_child_of_kip - else - warn("User #{user.id} has a school that is in SF but not cached yet #{contact_params[:school_id]}.") - # TODO: this is how we will be able to tell who needs to get synced during cron, after their school has synced. - # It'd be a good idea to let this run for awhile like this before changing the cron. - # Might also be worthwhile to add a streaming subscriber for SF schools? - user.needs_sync = true - end - - user.salesforce_contact_id = contact_params[:sf_id] - user.using_openstax = contact_params[:adoption_status] - - user.faculty_status = case contact_params[:faculty_verified] - when "confirmed_faculty" - :confirmed_faculty - when "pending_faculty" - :pending_faculty - when "rejected_faculty" - :rejected_faculty - when NilClass - :no_faculty_info - else - raise "Unknown faculty_verified field: '#{contact.faculty_verified}'' on contact #{contact.id}" - end - - user.grant_tutor_access = contact_params[:grant_tutor_access] - user.save! - Rails.logger.debug('Contact saved ID: ' + user.salesforce_contact_id) - else - Rails.logger.debug("No contact found for email #{contact_params[:email]}") - # this should not be happening for people we don't have emails for - let's log to sentry so we can investigate - Sentry.capture_message("[SF streaming] No contact found for email #{contact_params[:email]}") - end - - end - - private - - def sanitize_contact - sobject = @event['sobject'] - { - sf_id: sobject['Id'], - school_id: sobject['AccountId'], - email: sobject['Email'], - email_alt: sobject['Email_alt__c'], - faculty_verified: sobject['FV_Status__c'], - adoption_status: sobject['Adoption_Status__c'], - grant_tutor_access: sobject['Grant_Tutor_Access__c'], - accounts_uuid: sobject['Accounts_UUID__c'] - } - end -end diff --git a/app/services/lead_parser.rb b/app/services/lead_parser.rb deleted file mode 100644 index fb8d245fba..0000000000 --- a/app/services/lead_parser.rb +++ /dev/null @@ -1,57 +0,0 @@ -class LeadParser - - def initialize(event) - @event = event - end - - def save_lead - lead_params = sanitize_lead - ci_table = ContactInfo.arel_table - - user = User.find_by(uuid: lead_params[:accounts_uuid].to_s) - if !user.present? - user = User.joins(:contact_infos).eager_load(:contact_infos).where(ci_table[:value].lower.eq(lead_params[:email])).first - # TODO: lookup using all emails - end - - - if user.present? - user.salesforce_lead_id = lead_params[:sf_id] - - user.faculty_status = case lead_params[:faculty_verified] - when "confirmed_faculty" - :confirmed_faculty - when "pending_faculty" - :pending_faculty - when "rejected_faculty" - :rejected_faculty - when NilClass - :no_faculty_info - else - raise "Unknown faculty_verified field: '#{lead_params[:faculty_verified]}'' on lead #{lead_params[:sf_id]}" - end - - user.save! - store_salesforce_lead_id(user, lead_params[:sf_id]) - Rails.logger.debug('Lead saved ID: ' + user.salesforce_lead_id) - else - Rails.logger.debug("No lead found for email #{lead_params[:email]}") - # this should not be happening for people we don't have emails for - let's log to sentry so we can investigate - Sentry.capture_message("[SF streaming] No lead found for email #{lead_params[:email]}") - end - - end - - private - - def sanitize_lead - sobject = @event['sobject'] - { - sf_id: sobject['Id'], - email: sobject['Email'], - all_email: sobject['All_Emails__c'], - faculty_verified: sobject['FV_Status__c'], - accounts_uuid: sobject['Accounts_UUID__c'] - } - end -end diff --git a/app/services/salesforce_contact_replay_handler.rb b/app/services/salesforce_contact_replay_handler.rb deleted file mode 100644 index ead09abc6d..0000000000 --- a/app/services/salesforce_contact_replay_handler.rb +++ /dev/null @@ -1,45 +0,0 @@ -class SalesforceContactReplayHandler - - MAX_AGE = 86_400 # 24 hours - - INIT_REPLAY_ID = -1 - DEFAULT_REPLAY_ID = -2 - - def initialize() - @channels = {} - @push_topic = PushTopic.where(topic_name: SalesforceSubscriber::CONTACT_PUSH_TOPIC_NAME).first - @replay = SalesforceStreamingReplay.find_or_create_by!(push_topics_id: @push_topic.id) - end - - # This method is called during the initial subscribe phase - # in order to send the correct replay ID. - def [](channel) - if @replay.replay_id.nil? - # there is no replay id yet, so just start listening for new events and updating the database with replay ids - puts "[#{channel}] No timestamp defined, sending magic replay ID #{INIT_REPLAY_ID}" - - INIT_REPLAY_ID - elsif old_replay_id? - # the id is too old to playback from the event - so playback all the events from the last 24 hours - puts "[#{channel}] Old timestamp, sending magic replay ID #{DEFAULT_REPLAY_ID}" - @replay.destroy! - @replay = SalesforceStreamingReplay.find_or_create_by!(push_topics_id: @push_topic.id) - - DEFAULT_REPLAY_ID - else - @channels[channel] = @replay.replay_id - end - end - - def []=(channel, replay_id) - puts "[#{channel}] Writing replay ID: #{replay_id}" - - @replay.replay_id = replay_id - @replay.save! - @channels[channel] = replay_id - end - - def old_replay_id? - @replay.updated_at.is_a?(Time) && Time.now - @replay.updated_at > MAX_AGE - end -end diff --git a/app/services/salesforce_subscriber.rb b/app/services/salesforce_subscriber.rb deleted file mode 100644 index 37006bd1e8..0000000000 --- a/app/services/salesforce_subscriber.rb +++ /dev/null @@ -1,101 +0,0 @@ -# Restforce uses faye as the underlying implementation for CometD. - -require 'restforce' -require 'faye' - -class SalesforceSubscriber - attr_reader :client - CONTACT_PUSH_TOPIC_NAME = 'ContactChange' - LEAD_PUSH_TOPIC_NAME = 'LeadChange' - - def initialize - @client = OpenStax::Salesforce::Client.new - @authorization_hash = @client.authenticate! - end - - def create_contact_push_topic - topic = PushTopic.where(topic_name: CONTACT_PUSH_TOPIC_NAME).first - - unless topic - begin - retries ||= 0 - begin - contact_topic = @client.create!('PushTopic', - ApiVersion: '51.0', - Name: CONTACT_PUSH_TOPIC_NAME, - Description: 'all contact records', - NotifyForOperationCreate: 'true', - NotifyForOperationUpdate: 'true', - NotifyForFields: 'Referenced', - Query: 'select Id, AccountId, Email, Email_alt__c, FV_Status__c, Adoption_Status__c, Grant_Tutor_Access__c from Contact') - rescue - Rails.logger.debug('Salesforce contact stream already created.') - end - - if contact_topic.present? && contact_topic.is_a?(String) - PushTopic.create(topic_salesforce_id: contact_topic, topic_name: CONTACT_PUSH_TOPIC_NAME) - warn('Contact Push Topic Id: ' + contact_topic) - else - Rails.logger.error('failed to create push topic: ' + CONTACT_PUSH_TOPIC_NAME) - Sentry.capture_message('failed to create push topic: ' + CONTACT_PUSH_TOPIC_NAME) - raise - end - rescue Restforce::ErrorCode::DuplicateValue - Rails.logger.debug('Push topic duplicate found.') - retry if (retries += 1) < 3 - end - end - end - - def create_lead_push_topic - topic = PushTopic.where(topic_name: LEAD_PUSH_TOPIC_NAME).first - - unless topic - begin - retries ||= 0 - begin - lead_topic = @client.create!('PushTopic', - ApiVersion: '51.0', - Name: LEAD_PUSH_TOPIC_NAME, - Description: 'all lead records', - NotifyForOperationCreate: 'true', - NotifyForOperationUpdate: 'true', - NotifyForFields: 'Referenced', - Query: 'select Id, Email, All_Emails__c, FV_Status__c, Accounts_UUID__c from Lead') - rescue - Rails.logger.debug('Salesforce lead stream already created.') - end - - if lead_topic.present? && lead_topic.is_a?(String) - PushTopic.create(topic_salesforce_id: lead_topic, topic_name: LEAD_PUSH_TOPIC_NAME) - warn('Lead Push Topic Id: ' + lead_topic) - else - Rails.logger.error('failed to create push topic: ' + LEAD_PUSH_TOPIC_NAME) - Sentry.capture_message('failed to create push topic: ' + LEAD_PUSH_TOPIC_NAME) - raise - end - rescue Restforce::ErrorCode::DuplicateValue - Rails.logger.debug('Push topic duplicate found.') - retry if (retries += 1) < 3 - end - end - end - - def subscribe_contacts - @client.faye.set_header 'Authorization', "OAuth #{@authorization_hash.access_token}" - EM.run do - @client.subscription "/topic/#{CONTACT_PUSH_TOPIC_NAME}", replay: -1 do |message| - ContactParser.new(message).save_contact - end - end - end - - def subscribe_leads - @client.faye.set_header 'Authorization', "OAuth #{@authorization_hash.access_token}" - EM.run do - @client.subscription "/topic/#{LEAD_PUSH_TOPIC_NAME}", replay: -1 do |message| - LeadParser.new(message).save_lead - end - end - end -end diff --git a/db/migrate/20240123173834_drop_streaming_tables.rb b/db/migrate/20240123173834_drop_streaming_tables.rb new file mode 100644 index 0000000000..a07300027e --- /dev/null +++ b/db/migrate/20240123173834_drop_streaming_tables.rb @@ -0,0 +1,10 @@ +class DropStreamingTables < ActiveRecord::Migration[5.2] + def up + drop_table :salesforce_streaming_replays + drop_table :push_topics + end + + def down + raise ActiveRecord::IrreversibleMigration + end +end diff --git a/db/schema.rb b/db/schema.rb index b3f068c00a..040887a303 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 2023_10_04_175741) do +ActiveRecord::Schema.define(version: 2024_01_23_173834) do # These are extensions that must be enabled in order to support this database enable_extension "citext" @@ -348,19 +348,6 @@ t.index ["contact_info_kind"], name: "index_pre_auth_states_on_contact_info_kind" end - create_table "push_topics", force: :cascade do |t| - t.string "topic_salesforce_id" - t.string "topic_name" - end - - create_table "salesforce_streaming_replays", force: :cascade do |t| - t.bigint "push_topics_id" - t.datetime "created_at", null: false - t.datetime "updated_at", null: false - t.integer "replay_id" - t.index ["push_topics_id"], name: "index_salesforce_streaming_replays_on_push_topics_id" - end - create_table "schools", force: :cascade do |t| t.string "salesforce_id", null: false t.string "name", null: false @@ -502,7 +489,6 @@ add_foreign_key "external_ids", "users" add_foreign_key "oauth_access_grants", "oauth_applications", column: "application_id" add_foreign_key "oauth_access_tokens", "oauth_applications", column: "application_id" - add_foreign_key "salesforce_streaming_replays", "push_topics", column: "push_topics_id" add_foreign_key "users", "oauth_applications", column: "source_application_id" add_foreign_key "users", "schools" end diff --git a/lib/tasks/streaming.rake b/lib/tasks/streaming.rake deleted file mode 100644 index d94e2e06c0..0000000000 --- a/lib/tasks/streaming.rake +++ /dev/null @@ -1,14 +0,0 @@ -namespace :streaming do - desc 'Fetch updates of Contacts and Leads data from salesforce push stream and update in database' - task contact_stream: :environment do - subscriber = SalesforceSubscriber.new - subscriber.create_contact_push_topic - subscriber.subscribe_contacts - end - - task lead_stream: :environment do - subscriber = SalesforceSubscriber.new - subscriber.create_lead_push_topic - subscriber.subscribe_leads - end -end diff --git a/spec/factories/push_topic.rb b/spec/factories/push_topic.rb deleted file mode 100644 index 32d456e283..0000000000 --- a/spec/factories/push_topic.rb +++ /dev/null @@ -1,6 +0,0 @@ -FactoryBot.define do - factory :push_topic do - topic_name { 'ContactChange' } - topic_salesforce_id { "0IF4C0#{SecureRandom.alphanumeric(9)}" } - end -end diff --git a/spec/factories/salesforce_streaming_replays.rb b/spec/factories/salesforce_streaming_replays.rb deleted file mode 100644 index 3b4600e696..0000000000 --- a/spec/factories/salesforce_streaming_replays.rb +++ /dev/null @@ -1,5 +0,0 @@ -FactoryBot.define do - factory :salesforce_streaming_replay do - replay_id { SecureRandom.random_number(3) } - end -end diff --git a/spec/models/push_topic.rb b/spec/models/push_topic.rb deleted file mode 100644 index 214a4ac9a4..0000000000 --- a/spec/models/push_topic.rb +++ /dev/null @@ -1,10 +0,0 @@ -require 'rails_helper' - -RSpec.describe PushTopic, type: :model do - subject(:push_topic) { FactoryBot.create :push_topic } - - it 'can be created' do - expect(push_topic.topic_name).to eq('ContactChange') - expect(push_topic.topic_salesforce_id).to start_with('0IF4C0') - end -end diff --git a/spec/models/salesforce_streaming_replay_spec.rb b/spec/models/salesforce_streaming_replay_spec.rb deleted file mode 100644 index 049c214be9..0000000000 --- a/spec/models/salesforce_streaming_replay_spec.rb +++ /dev/null @@ -1,9 +0,0 @@ -require 'rails_helper' - -RSpec.describe SalesforceStreamingReplay, type: :model do - subject(:streaming_replay) { FactoryBot.create :salesforce_streaming_replay } - - it 'can be created' do - expect(streaming_replay.replay_id).to be >= 0 - end -end