From 272d4081a2494c48c3bb1b804fab952594cf8acf Mon Sep 17 00:00:00 2001
From: Eugen Rochko <eugen@zeonfederated.com>
Date: Fri, 28 Jun 2019 21:52:24 +0200
Subject: [PATCH] WIP

---
 .../activitypub/inboxes_controller.rb         |  7 ++
 app/controllers/api/push_controller.rb        | 73 -------------------
 app/controllers/api/salmon_controller.rb      | 37 ----------
 .../api/subscriptions_controller.rb           | 51 -------------
 app/lib/ostatus/atom_serializer.rb            |  2 -
 app/serializers/webfinger_serializer.rb       |  1 -
 app/services/subscribe_service.rb             | 52 -------------
 app/services/unsubscribe_service.rb           | 30 --------
 app/views/accounts/show.html.haml             |  1 -
 app/views/well_known/webfinger/show.xml.ruby  |  5 --
 app/workers/activitypub/delivery_worker.rb    |  4 +-
 app/workers/pubsubhubbub/delivery_worker.rb   |  6 +-
 config/initializers/instrumentation.rb        | 36 +++++++++
 config/navigation.rb                          |  1 -
 config/routes.rb                              | 11 ---
 15 files changed, 47 insertions(+), 270 deletions(-)
 delete mode 100644 app/controllers/api/push_controller.rb
 delete mode 100644 app/controllers/api/salmon_controller.rb
 delete mode 100644 app/controllers/api/subscriptions_controller.rb

diff --git a/app/controllers/activitypub/inboxes_controller.rb b/app/controllers/activitypub/inboxes_controller.rb
index a0b7532c2e..961e82f21e 100644
--- a/app/controllers/activitypub/inboxes_controller.rb
+++ b/app/controllers/activitypub/inboxes_controller.rb
@@ -5,6 +5,7 @@ class ActivityPub::InboxesController < Api::BaseController
   include JsonLdHelper
 
   before_action :set_account
+  around_action :instrument_ingress
 
   def create
     if unknown_deleted_account?
@@ -51,4 +52,10 @@ class ActivityPub::InboxesController < Api::BaseController
   def process_payload
     ActivityPub::ProcessingWorker.perform_async(signed_request_account.id, body, @account&.id)
   end
+
+  def instrument_ingress
+    ActiveSupport::Notifications.instrument('activitypub.ingress', domain: signed_request_account.domain, ip: request.remote_ip) do
+      yield
+    end
+  end
 end
diff --git a/app/controllers/api/push_controller.rb b/app/controllers/api/push_controller.rb
deleted file mode 100644
index e04d19125b..0000000000
--- a/app/controllers/api/push_controller.rb
+++ /dev/null
@@ -1,73 +0,0 @@
-# frozen_string_literal: true
-
-class Api::PushController < Api::BaseController
-  include SignatureVerification
-
-  def update
-    response, status = process_push_request
-    render plain: response, status: status
-  end
-
-  private
-
-  def process_push_request
-    case hub_mode
-    when 'subscribe'
-      Pubsubhubbub::SubscribeService.new.call(account_from_topic, hub_callback, hub_secret, hub_lease_seconds, verified_domain)
-    when 'unsubscribe'
-      Pubsubhubbub::UnsubscribeService.new.call(account_from_topic, hub_callback)
-    else
-      ["Unknown mode: #{hub_mode}", 422]
-    end
-  end
-
-  def hub_mode
-    params['hub.mode']
-  end
-
-  def hub_topic
-    params['hub.topic']
-  end
-
-  def hub_callback
-    params['hub.callback']
-  end
-
-  def hub_lease_seconds
-    params['hub.lease_seconds']
-  end
-
-  def hub_secret
-    params['hub.secret']
-  end
-
-  def account_from_topic
-    if hub_topic.present? && local_domain? && account_feed_path?
-      Account.find_local(hub_topic_params[:username])
-    end
-  end
-
-  def hub_topic_params
-    @_hub_topic_params ||= Rails.application.routes.recognize_path(hub_topic_uri.path)
-  end
-
-  def hub_topic_uri
-    @_hub_topic_uri ||= Addressable::URI.parse(hub_topic).normalize
-  end
-
-  def local_domain?
-    TagManager.instance.web_domain?(hub_topic_domain)
-  end
-
-  def verified_domain
-    return signed_request_account.domain if signed_request_account
-  end
-
-  def hub_topic_domain
-    hub_topic_uri.host + (hub_topic_uri.port ? ":#{hub_topic_uri.port}" : '')
-  end
-
-  def account_feed_path?
-    hub_topic_params[:controller] == 'accounts' && hub_topic_params[:action] == 'show' && hub_topic_params[:format] == 'atom'
-  end
-end
diff --git a/app/controllers/api/salmon_controller.rb b/app/controllers/api/salmon_controller.rb
deleted file mode 100644
index ac5f3268d8..0000000000
--- a/app/controllers/api/salmon_controller.rb
+++ /dev/null
@@ -1,37 +0,0 @@
-# frozen_string_literal: true
-
-class Api::SalmonController < Api::BaseController
-  include SignatureVerification
-
-  before_action :set_account
-  respond_to :txt
-
-  def update
-    if verify_payload?
-      process_salmon
-      head 202
-    elsif payload.present?
-      render plain: signature_verification_failure_reason, status: 401
-    else
-      head 400
-    end
-  end
-
-  private
-
-  def set_account
-    @account = Account.find(params[:id])
-  end
-
-  def payload
-    @_payload ||= request.body.read
-  end
-
-  def verify_payload?
-    payload.present? && VerifySalmonService.new.call(payload)
-  end
-
-  def process_salmon
-    SalmonWorker.perform_async(@account.id, payload.force_encoding('UTF-8'))
-  end
-end
diff --git a/app/controllers/api/subscriptions_controller.rb b/app/controllers/api/subscriptions_controller.rb
deleted file mode 100644
index 89007f3d6e..0000000000
--- a/app/controllers/api/subscriptions_controller.rb
+++ /dev/null
@@ -1,51 +0,0 @@
-# frozen_string_literal: true
-
-class Api::SubscriptionsController < Api::BaseController
-  before_action :set_account
-  respond_to :txt
-
-  def show
-    if subscription.valid?(params['hub.topic'])
-      @account.update(subscription_expires_at: future_expires)
-      render plain: encoded_challenge, status: 200
-    else
-      head 404
-    end
-  end
-
-  def update
-    if subscription.verify(body, request.headers['HTTP_X_HUB_SIGNATURE'])
-      ProcessingWorker.perform_async(@account.id, body.force_encoding('UTF-8'))
-    end
-
-    head 200
-  end
-
-  private
-
-  def subscription
-    @_subscription ||= @account.subscription(
-      api_subscription_url(@account.id)
-    )
-  end
-
-  def body
-    @_body ||= request.body.read
-  end
-
-  def encoded_challenge
-    HTMLEntities.new.encode(params['hub.challenge'])
-  end
-
-  def future_expires
-    Time.now.utc + lease_seconds_or_default
-  end
-
-  def lease_seconds_or_default
-    (params['hub.lease_seconds'] || 1.day).to_i.seconds
-  end
-
-  def set_account
-    @account = Account.find(params[:id])
-  end
-end
diff --git a/app/lib/ostatus/atom_serializer.rb b/app/lib/ostatus/atom_serializer.rb
index 9a05d96cf9..f5c0e85cae 100644
--- a/app/lib/ostatus/atom_serializer.rb
+++ b/app/lib/ostatus/atom_serializer.rb
@@ -53,8 +53,6 @@ class OStatus::AtomSerializer
     append_element(feed, 'link', nil, rel: :alternate, type: 'text/html', href: ::TagManager.instance.url_for(account))
     append_element(feed, 'link', nil, rel: :self, type: 'application/atom+xml', href: account_url(account, format: 'atom'))
     append_element(feed, 'link', nil, rel: :next, type: 'application/atom+xml', href: account_url(account, format: 'atom', max_id: stream_entries.last.id)) if stream_entries.size == 20
-    append_element(feed, 'link', nil, rel: :hub, href: api_push_url)
-    append_element(feed, 'link', nil, rel: :salmon, href: api_salmon_url(account.id))
 
     stream_entries.each do |stream_entry|
       feed << entry(stream_entry)
diff --git a/app/serializers/webfinger_serializer.rb b/app/serializers/webfinger_serializer.rb
index 8c0b077020..4220f697e6 100644
--- a/app/serializers/webfinger_serializer.rb
+++ b/app/serializers/webfinger_serializer.rb
@@ -18,7 +18,6 @@ class WebfingerSerializer < ActiveModel::Serializer
       { rel: 'http://webfinger.net/rel/profile-page', type: 'text/html', href: short_account_url(object) },
       { rel: 'http://schemas.google.com/g/2010#updates-from', type: 'application/atom+xml', href: account_url(object, format: 'atom') },
       { rel: 'self', type: 'application/activity+json', href: account_url(object) },
-      { rel: 'salmon', href: api_salmon_url(object.id) },
       { rel: 'magic-public-key', href: "data:application/magic-public-key,#{object.magic_key}" },
       { rel: 'http://ostatus.org/schema/1.0/subscribe', template: "#{authorize_interaction_url}?uri={uri}" },
     ]
diff --git a/app/services/subscribe_service.rb b/app/services/subscribe_service.rb
index 83fd64396a..c441e8598f 100644
--- a/app/services/subscribe_service.rb
+++ b/app/services/subscribe_service.rb
@@ -2,57 +2,5 @@
 
 class SubscribeService < BaseService
   def call(account)
-    return if account.hub_url.blank?
-
-    @account        = account
-    @account.secret = SecureRandom.hex
-
-    build_request.perform do |response|
-      if response_failed_permanently? response
-        # We're not allowed to subscribe. Fail and move on.
-        @account.secret = ''
-        @account.save!
-      elsif response_successful? response
-        # The subscription will be confirmed asynchronously.
-        @account.save!
-      else
-        # The response was either a 429 rate limit, or a 5xx error.
-        # We need to retry at a later time. Fail loudly!
-        raise Mastodon::UnexpectedResponseError, response
-      end
-    end
-  end
-
-  private
-
-  def build_request
-    request = Request.new(:post, @account.hub_url, form: subscription_params)
-    request.on_behalf_of(some_local_account) if some_local_account
-    request
-  end
-
-  def subscription_params
-    {
-      'hub.topic': @account.remote_url,
-      'hub.mode': 'subscribe',
-      'hub.callback': api_subscription_url(@account.id),
-      'hub.verify': 'async',
-      'hub.secret': @account.secret,
-      'hub.lease_seconds': 7.days.seconds,
-    }
-  end
-
-  def some_local_account
-    @some_local_account ||= Account.local.without_suspended.first
-  end
-
-  # Any response in the 3xx or 4xx range, except for 429 (rate limit)
-  def response_failed_permanently?(response)
-    (response.status.redirect? || response.status.client_error?) && !response.status.too_many_requests?
-  end
-
-  # Any response in the 2xx range
-  def response_successful?(response)
-    response.status.success?
   end
 end
diff --git a/app/services/unsubscribe_service.rb b/app/services/unsubscribe_service.rb
index 95c1fb4fc0..5fc40e63c6 100644
--- a/app/services/unsubscribe_service.rb
+++ b/app/services/unsubscribe_service.rb
@@ -2,35 +2,5 @@
 
 class UnsubscribeService < BaseService
   def call(account)
-    return if account.hub_url.blank?
-
-    @account = account
-
-    begin
-      build_request.perform do |response|
-        Rails.logger.debug "PuSH unsubscribe for #{@account.acct} failed: #{response.status}" unless response.status.success?
-      end
-    rescue HTTP::Error, OpenSSL::SSL::SSLError => e
-      Rails.logger.debug "PuSH unsubscribe for #{@account.acct} failed: #{e}"
-    end
-
-    @account.secret = ''
-    @account.subscription_expires_at = nil
-    @account.save!
-  end
-
-  private
-
-  def build_request
-    Request.new(:post, @account.hub_url, form: subscription_params)
-  end
-
-  def subscription_params
-    {
-      'hub.topic': @account.remote_url,
-      'hub.mode': 'unsubscribe',
-      'hub.callback': api_subscription_url(@account.id),
-      'hub.verify': 'async',
-    }
   end
 end
diff --git a/app/views/accounts/show.html.haml b/app/views/accounts/show.html.haml
index 950e618477..de7d2a8ba3 100644
--- a/app/views/accounts/show.html.haml
+++ b/app/views/accounts/show.html.haml
@@ -7,7 +7,6 @@
   - if @account.user&.setting_noindex
     %meta{ name: 'robots', content: 'noindex' }/
 
-  %link{ rel: 'salmon', href: api_salmon_url(@account.id) }/
   %link{ rel: 'alternate', type: 'application/atom+xml', href: account_url(@account, format: 'atom') }/
   %link{ rel: 'alternate', type: 'application/rss+xml', href: account_url(@account, format: 'rss') }/
   %link{ rel: 'alternate', type: 'application/activity+json', href: ActivityPub::TagManager.instance.uri_for(@account) }/
diff --git a/app/views/well_known/webfinger/show.xml.ruby b/app/views/well_known/webfinger/show.xml.ruby
index 968c8c1380..c82cdb7b3d 100644
--- a/app/views/well_known/webfinger/show.xml.ruby
+++ b/app/views/well_known/webfinger/show.xml.ruby
@@ -25,11 +25,6 @@ doc << Ox::Element.new('XRD').tap do |xrd|
     link['href']     = account_url(@account)
   end
 
-  xrd << Ox::Element.new('Link').tap do |link|
-    link['rel']      = 'salmon'
-    link['href']     = api_salmon_url(@account.id)
-  end
-
   xrd << Ox::Element.new('Link').tap do |link|
     link['rel']      = 'magic-public-key'
     link['href']     = "data:application/magic-public-key,#{@account.magic_key}"
diff --git a/app/workers/activitypub/delivery_worker.rb b/app/workers/activitypub/delivery_worker.rb
index 5e4c391f0d..bc081f7b96 100644
--- a/app/workers/activitypub/delivery_worker.rb
+++ b/app/workers/activitypub/delivery_worker.rb
@@ -18,7 +18,9 @@ class ActivityPub::DeliveryWorker
     @source_account = Account.find(source_account_id)
     @inbox_url      = inbox_url
 
-    perform_request
+    ActiveSupport::Notifications.instrument('activitypub.egress', domain: Addressable::URI.parse(@inbox_url).normalized_host) do
+      perform_request
+    end
 
     failure_tracker.track_success!
   rescue => e
diff --git a/app/workers/pubsubhubbub/delivery_worker.rb b/app/workers/pubsubhubbub/delivery_worker.rb
index 619bfa48aa..077a8d6761 100644
--- a/app/workers/pubsubhubbub/delivery_worker.rb
+++ b/app/workers/pubsubhubbub/delivery_worker.rb
@@ -52,11 +52,7 @@ class Pubsubhubbub::DeliveryWorker
   end
 
   def link_header
-    LinkHeader.new([hub_link_header, self_link_header]).to_s
-  end
-
-  def hub_link_header
-    [api_push_url, [%w(rel hub)]]
+    LinkHeader.new([self_link_header]).to_s
   end
 
   def self_link_header
diff --git a/config/initializers/instrumentation.rb b/config/initializers/instrumentation.rb
index 8483f2be2e..476ddc7a01 100644
--- a/config/initializers/instrumentation.rb
+++ b/config/initializers/instrumentation.rb
@@ -16,3 +16,39 @@ ActiveSupport::Notifications.subscribe(/process_action.action_controller/) do |*
   ActiveSupport::Notifications.instrument :performance, action: :measure, measurement: "#{key}.view_time", value: event.payload[:view_runtime]
   ActiveSupport::Notifications.instrument :performance, measurement: "#{key}.status.#{status}"
 end
+
+EXPIRE_AFTER = 2.days.seconds.freeze
+
+ActiveSupport::Notifications.subscribe(/activitypub.(ingress|egress)/) do |*args|
+  event   = ActiveSupport::Notifications::Event.new(*args)
+  buckets = [event.started.to_i % 3_600, event.started.to_i % 86_400]
+
+  case event.name
+  when 'activitypub.ingress'
+    buckets.each do |bucket|
+      Redis.current.hincrby("counters:activitypub.ingress:#{bucket}", "domain:#{event.payload[:domain]}", 1)
+      Redis.current.hincrby("counters:activitypub.ingress:#{bucket}", "ip:#{event.payload[:ip]}", 1)
+      Redis.current.expire("counters:activitypub.ingress:#{bucket}", EXPIRE_AFTER)
+    end
+  when 'activitypub.egress'
+    buckets.each do |bucket|
+      Redis.current.hincrby("counters:activitypub.egress:#{bucket}", "domain:#{event.payload[:domain]}", 1)
+      Redis.current.expire("counters:activitypub.egress:#{bucket}", EXPIRE_AFTER)
+    end
+  end
+end
+
+def anomalies
+  now = Time.now.to_i
+
+  [3_600, 86_400].each do |interval|
+    current_period = Redis.current.hgetall("counters:activitypub.ingress:#{now % interval}")
+    past_period    = Redis.current.hgetall("counters:activitypub.ingress:#{now % interval - interval}")
+
+    current_period.each_pair do |key, value|
+      if value > past_period[key]
+        # Sound the alarm!
+      end
+    end
+  end
+end
diff --git a/config/navigation.rb b/config/navigation.rb
index df10241892..ef845d1fc5 100644
--- a/config/navigation.rb
+++ b/config/navigation.rb
@@ -48,7 +48,6 @@ SimpleNavigation::Configuration.run do |navigation|
       s.item :settings, safe_join([fa_icon('cogs fw'), t('admin.settings.title')]), edit_admin_settings_url, if: -> { current_user.admin? }, highlights_on: %r{/admin/settings}
       s.item :custom_emojis, safe_join([fa_icon('smile-o fw'), t('admin.custom_emojis.title')]), admin_custom_emojis_url, highlights_on: %r{/admin/custom_emojis}
       s.item :relays, safe_join([fa_icon('exchange fw'), t('admin.relays.title')]), admin_relays_url, if: -> { current_user.admin? }, highlights_on: %r{/admin/relays}
-      s.item :subscriptions, safe_join([fa_icon('paper-plane-o fw'), t('admin.subscriptions.title')]), admin_subscriptions_url, if: -> { current_user.admin? }
       s.item :sidekiq, safe_join([fa_icon('diamond fw'), 'Sidekiq']), sidekiq_url, link_html: { target: 'sidekiq' }, if: -> { current_user.admin? }
       s.item :pghero, safe_join([fa_icon('database fw'), 'PgHero']), pghero_url, link_html: { target: 'pghero' }, if: -> { current_user.admin? }
     end
diff --git a/config/routes.rb b/config/routes.rb
index 764db8db2e..e6bc57a315 100644
--- a/config/routes.rb
+++ b/config/routes.rb
@@ -154,7 +154,6 @@ Rails.application.routes.draw do
   namespace :admin do
     get '/dashboard', to: 'dashboard#index'
 
-    resources :subscriptions, only: [:index]
     resources :domain_blocks, only: [:new, :create, :show, :destroy]
     resources :email_domain_blocks, only: [:index, :new, :create, :destroy]
     resources :action_logs, only: [:index]
@@ -257,16 +256,6 @@ Rails.application.routes.draw do
   get '/admin', to: redirect('/admin/dashboard', status: 302)
 
   namespace :api do
-    # PubSubHubbub outgoing subscriptions
-    resources :subscriptions, only: [:show]
-    post '/subscriptions/:id', to: 'subscriptions#update'
-
-    # PubSubHubbub incoming subscriptions
-    post '/push', to: 'push#update', as: :push
-
-    # Salmon
-    post '/salmon/:id', to: 'salmon#update', as: :salmon
-
     # OEmbed
     get '/oembed', to: 'oembed#show', as: :oembed
 
-- 
GitLab