What is the Observer Pattern?
Have you written a system that has the concept of “events” that need to notify users? Perhaps you send an email on a system error to an administrator. Or send a “password changed” event notification to a user? In a CRM perhaps clients can subscribe to new customers as they come into the system as the result of marketing efforts?
This can be represented using the Observer pattern. Your users have subscribed to one or more events, and they are “observing” new events. The events themselves are “observable” via these event subscriptions.
This blog post will walk through refactoring an existing event notification system that did not use the observer pattern, and instead relied on a multiple passes to calculate an intersection of users and events that ran periodically via cron.
Old pattern
We have a CRM that is written in Ruby on Rails. When a database record is generated for something we want to create an event for, we use the ActiveRecord callback to create a second “event” record after save. This event record is associated to the source record and includes some contextual information like a type “NewLead”, or “SystemError”, and whether or not it has been processed (defaulted to false). Event also includes some hierarchical attributes so we can enforce authorizations in a multi-tenant application.
Once the event records are created, we had a cron entry that would fire every minute and check for new events. This would do a database query looking up unswept events, and looking up all active subscriptions. Doing some intersection logic, we would then generate an “event subscription” record for each event that can belong to an intersection. This did not scale particularly well because a generic event could potentially go to hundreds of subscribers, which would mean a database table entry per subscriber. This information is also not valuable after the notification is sent.
Once the events are swept, and “event notifications” records are generated, we could then send out an email. For more usability we have a digest feature where emails are not sent immediately when events come in. Rather we collect for a period of time, then send a summary of events that have occurred since the last email was sent.
This system did not scale due to the volume of events and subscriptions. We had issues where long inserts, and large datasets caused deadlocks in the database and out of memory errors on the server. The goals were to keep the events, and the subscriptions, but optimize the wofklow so that we could scale further, and have less transient data stored in the database. It goes without saying, but the replacement needed to be robust, and easy to reason about with good test coverage as well.
New pattern
The Observer pattern I mentioned a few minutes ago was the right fit for this particular problem. Instead of a cron initiated loop with lots of state being persisted we would move to an in-memory event loop and stream the events to the subscriptions.
One wrinkle with the observer pattern was the email digest requirement. Observers are great at taking events and then processing them immediately. Building a collection required some investigation, but using Event Machine and periodic timer I was able to preserve this functionality as we will see.
The event system
There are two primary areas – the observable, and the observer. In Ruby, we can compose the Observable pattern by including the module into our class. This sets up a pseudo interface in Ruby that we can implement in our observable and observers. Of course Ruby doesn’t have true interfaces so this is more of a guideline than a compile time check.
The observable class:
require 'observer' module Events class Observable include ::Observable attr_accessor :last_tick_at def initialize self.last_tick_at = Time.zone.now observers.each { |observer| observer.register(self) } end def run! changed_subscriptions.each do |subscription| subscription.reregister(self) end new_events do |event| changed notify_observers(event) end end private def new_events(&block) Event.yield_and_sweep(&block) end def changed_subscriptions Subscription.where("updated_at >= ?", last_tick_at).tap do self.last_tick_at = Time.zone.now end end def observers Subscription.only_active.not_paused end end end
The observable class will be instantiated by our task runner. During the initialization it grabs the current time, and registers its observers (subscriptions). Its method run!
will be invoked periodically and is responsible for finding and sweeping new events, and sending out to the observers. It also checks if any subscriptions have been added, changed, or deleted via the updated_at
timestamp which is compared against the last run. These subscription observers have to re-register with the observable to reflect their changes.
Next up is the runner::
require 'eventmachine' module Events class Runner def start EM.run do observable = Events::Observable.new run_timer = EventMachine::PeriodicTimer.new(0) { observable.run! } run_timer.interval = 1.minute.to_i end end end end
This runner instantiates the observable class and starts an Event Machine loop. Within this loop we add a periodic timer every minute to run the observable. As we saw in the observable, the run!
method will register its observables, and send them events.
Finally, we have the observer class:
module Events module Observer attr_accessor :observed_events, :timer def register(observable) observable.add_observer(self) self.observed_events ||= [] self.timer = EventMachine::PeriodicTimer.new(self.wait_interval) { process_events } end def unregister(observable) observable.delete_observer(self) self.timer.try(:cancel) end def reregister(observable) unregister(observable) register(observable) end def update(observed_event) return unless authorized_for_context(observed_event) self.observed_events.push(observed_event) end def process_events return unless observed_events.present? EventMessageMailer.delay(queue: :email).event_message(observed_events.map(&:id), self.user) observed_events.clear end private def authorized_for_context(event) return false unless self.event_type.blank? || self.event_type == event.type return false unless self.agency_id.blank? || self.agency_id == event.agency_id return false unless self.client_id.blank? || self.client_id == event.client_id return false unless self.business_unit_id.blank? || self.business_unit_id == event.business_unit_id true end end end
This is a module that can be mixed in to an ActiveRecord “Subscription” model. There are some assumptions that the model will have certain attributes pertaining to the hierarchy that the subscription is for (agency_id, client_id, business_unit_id) and an attribute for what type the subscription is. In order for an event to be collected the event that is received must pass authorization checks, and must be for the appropriate event type.
Of note is the wait_for_interval. This is logic around the digest functionality. Each subscription has an attribute for how long to wait. A value could be 5 minutes, 1 hour, 1 day, etc. The process_events
call will be fired when the PeriodicTimer fires on the specified interval. This clears out events when they are processed.
Lets see a diagram of how all this will work:
Not shown is the EventMailer. This is just a standard ActiveRecord mailer however that takes an array of events, and shows them as a list. These are then sent to the user’s email address via a background job.
Observable Benefits
There are a few benefits of using the observable pattern that we replaced our existing code with:
- Efficiency – Subscriptions are not recalculated every time we sweep for events.
- Storage size – Because the event runner continues to run in the foreground, we don’t have to worry about persisting a lot of in between state. The concept of event subscriptions lives in memory. Messages are constructed on demand and emailed asynchronously.
- Codebase size – Almost all of the cron entries, task runners, sweepers, and calculations on the event and subscription intersections have been removed because they aren’t necessary. We now have zero cron entries, and one task runner that is responsible for picking up new events and sending to the observers.
- Agreed upon pattern – While only a benefit if you know the pattern, there is a much greater chance someone will know what an observer is instead of a custom solution
- Interface checking – Because we are using the Ruby Observable module, we get some safety in adding new observers, and when changing our observable. If the observer doesn’t implement the update method, an exception will be raised when registered with the observable.
Task runner
Now that we have implemented the observer pattern, we need some scripting to have this run in the foreground and recover from any crashes. For this I used the Daemon gem:
require 'daemons' require File.expand_path('../../config/environment', __FILE__) require Rails.root.join('app/models/events/runner') Daemons.run_proc('events', { dir: Rails.root.join('tmp/pids/'), dir_mode: :normal, log_dir: Rails.root.join('log/'), backtrace: true, monitor: true }) do logger = Logger.new(Rails.root.join('log/events.log')) Events::Runner.new(logger: logger).start end
The observable can now be instantiated with a script/events.rb start
command. In the event of a crash, I get log output of the stacktrace, and an automatic restart.
What is next
Currently, there isn’t a mechanism to serialize events that are in memory for subscriptions. This was due to the fact that most are minutely, and because the service so rarely gets restarted, and the risk of not receiving a notification was low this problem was left for later. Likely the approach would be trapping a SIGTERM, or SIGKILL signal, and persisting the unprocessed events for each given observable in a way that is recoverable on restart. A text file is probably sufficient, as this data isn’t valuable long term.
Learn More
If you want to learn more about the observer pattern, I recommend Head First: Design Patterns. The content is good, but the 1990’s stock photos will make you groan.
Additionally, SourceMaking has great information on many design patterns including the observer.