Have you ever come across API services that restricts the number of calls you can make within a time period? This in fact is very common these days, that developers would have to work around the limitation in order to not flood the service and also reduce the cost. Neither the limitation nor the solution is new to most of us, but every language has its own way of implementing it. We are going to look how we can solve this using caching with redis and pattern matching in elixir.
Suppose we have an application "oscar", that fetches movie ratings from 5 review portals using APIs. The problem does not sound complicated, so let's add some more layers to it.
- Users can request a movie to be rated, but results are curated asynchronously by Oscar
- All of the 5 APIs have rate limits, calling frequently will produce 429 http status code
Let's start with a basic Elixir module
defmodule Oscar do
alias Oscar.{Imdb, RottenTomatoes, Sify, Netflix, AmazonPrime}
def rate_movie(name) do
Logger.info("Fetching movie ratings for #{name}")
ratings = %{
imdb: Imdb.get_rating!(name),
rt: RottenTomatoes.get_rating!(name),
sify: Sify.get_rating!(name),
netflix: Netflix.get_rating!(name),
amazon: AmazonPrime.get_rating!(name)
}
save(name, ratings)
end
end
Nothing fancy here. We have 5 clients namely Imdb, RottenTomatoes, Sify, Netflix, and AmazonPrime all of which allows us to "get_rating" of a movie.
defmodule Oscar.Imdb do
def get_rating!(name) do
case HTTPoison.get!("https://imdb.com/movies/#{name}") do
%HTTPoison.Response{status_code: 200, body: body} ->
Logger.debug("Got Imdb rating for #{name}")
parse_rating(body)
_ ->
raise HTTPoison.Error, reason: "Oops! we got a non 200 status code from Imdb"
end
end
# ... other functions omitted for brevity
end
As you can see, 200 status code is important for us to parse the rating. All other status code for the purpose of this blog is unacceptable for us. Assuming we have implemented the other 4 clients, we move onto calling these APIs within a background worker.
Unlike ruby where you'd use Sidekiq, or Celery in python, most often elixir does not require such heavy background workers. But our assumption is that the problem requires us to use Redis caching methods towards the end, so let's use ExQ as background worker.
Case 1: All APIs succeed:
iex> {:ok, job_id} = Exq.enqueue(Exq, "default", OscarWorker, ["interstellar"])
[info] Fetching movie ratings for interstellar
[debug] Got Imdb rating for interstellar
[debug] Got RottenTomatoes rating for interstellar
[debug] Got Sify rating for interstellar
[debug] Got Netflix rating for interstellar
[debug] Got AmazonPrime rating for interstellar
[info] Saved rating for interstellar
[info] Elixir.Oscar.OscarWorker[e90f871a-a2dd-4a86-8914-6e94dfde09b3] success: 1250ms
Awesome! That worked like a charm.
Case 2: AmazonPrime blocks with 429:
iex> {:ok, job_id} = Exq.enqueue(Exq, "default", OscarWorker, ["interstellar"])
[info] Fetching movie ratings for interstellar
[debug] Got Imdb rating for interstellar
[debug] Got RottenTomatoes rating for interstellar
[debug] Got Sify rating for interstellar
[debug] Got Netflix rating for interstellar
** (HTTPoison.Error) "Oops! we got a non 200 status code from AmazonPrime"
...
[info] Elixir.Oscar.OscarWorker[e90f871a-a2dd-4a86-8914-6e94dfde09b3] fail: 1270ms
[info] Queueing job e90f871a-a2dd-4a86-8914-6e94dfde09b3 to retry in 54.0 seconds
.
.
[info] Fetching movie ratings for interstellar
[debug] Got Imdb rating for interstellar
[debug] Got RottenTomatoes rating for interstellar
[debug] Got Sify rating for interstellar
[debug] Got Netflix rating for interstellar
[debug] Got AmazonPrime rating for interstellar
[info] Saved rating for interstellar
[info] Elixir.Oscar.OscarWorker[e90f871a-a2dd-4a86-8914-6e94dfde09b3] success: 2750ms
Here the worker had to retry once to get all the ratings. When we have 5 services, its possible that many a times all of them might hit the rate limit within one job, delaying the results or even end up as a dead job.
How can we reduce the number of calls we make to same service for the very same movie, to exactly once within a job?
When we look at caching options, there are other ways to store the data we already queried, into our redis. However, we are going to consider the "per job basis" for this blog. Meaning, everytime we fetch a rating from one portal, we cache it. Caching based on movie name is also possible but it would mean the review can get outdated, and we have to manage re-fetch based on timestamp. Let's not get into those in this blog.
Simply caching it doesn't really help us. We need to lookup for the cache before calling the API. Let's make some changes to use cache:
defmodule Oscar do
alias Oscar.{Imdb, RottenTomatoes, Sify, Netflix, AmazonPrime}
alias Oscar.RedisCache
@default %{
imdb: nil,
rotten_tomatoes: nil,
sify: nil,
netflix: nil,
amazon_prime: nil,
uid: nil
}
def rate_movie(uid, name) do
Logger.info("Fetching movie ratings for #{name}", uid: uid)
metadata = get_cached_or_new_metadata(uid)
metadata
|> get_rating("imdb", name)
|> cache()
|> get_rating("rt", name)
|> cache()
|> get_rating("sify", name)
|> cache()
|> get_rating("netflix", name)
|> cache()
|> get_rating("amazon_prime", name)
|> cache()
|> save(name)
end
defp get_cached_or_new_metadata(uid) do
case RedisCache.get(uid) do
{:ok, nil} -> Map.put(@default, :uid, uid)
{:ok, cached} -> Jason.decode!(cached, keys: :atoms)
end
end
defp cache(metadata) do
:ok = RedisCache.set(metadata.uid, Jason.encode!(metadata))
metadata
end
defp get_rating(%{imdb: imdb, uid: uid} = metadata, "imdb", name) when is_float(imdb) do
Logger.debug("Imdb rating for #{name} exists in cache", uid: uid)
metadata
end
defp get_rating(metadata, "imdb", name) when is_float(imdb) do
Map.put(metadata, :imdb, Imdb.get_rating!(name))
end
defp get_rating(%{rt: rt, uid: uid} = metadata, "rt", name) when is_float(rt) do
Logger.debug("RottenTomatoes rating for #{name} exists in cache", uid: uid)
metadata
end
defp get_rating(metadata, "rt", name) when is_float(rt) do
Map.put(metadata, :rt, RottenTomatoes.get_rating!(name))
end
# ...other similar functions omitted for brevity
end
Now we revisit our cases once again and see if these changes helped. Assuming we have already wrapped up the Oscar.rate_movie/2
call in an ExQ OscarWorker
, we can queue the job like this:
Case 1: All APIs succeed:
iex> {:ok, job_id} = Exq.enqueue(Exq, "default", OscarWorker, ["interstellar"])
[info] Fetching movie ratings for interstellar
[debug] Got Imdb rating for interstellar
[debug] Got RottenTomatoes rating for interstellar
[debug] Got Sify rating for interstellar
[debug] Got Netflix rating for interstellar
[debug] Got AmazonPrime rating for interstellar
[info] Saved rating for interstellar
[info] Elixir.Oscar.OscarWorker[ab7f871a-c4ae-4a86-8914-6e94dfde09b3] success: 1250ms
As expected there are no issues here, let's move on to next case.
Case 2: Sify and AmazonPrime blocks with 429:
iex> {:ok, job_id} = Exq.enqueue(Exq, "default", OscarWorker, ["interstellar"])
[info] Fetching movie ratings for interstellar
[debug] Got Imdb rating for interstellar
[debug] Got RottenTomatoes rating for interstellar
** (HTTPoison.Error) "Oops! we got a non 200 status code from Sify" <---- first failure
...
[info] Elixir.Oscar.OscarWorker[ab7f871a-c4ae-4a86-8914-6e94dfde09b3] fail: 1270ms
[info] Queueing job ab7f871a-c4ae-4a86-8914-6e94dfde09b3 to retry in 54.0 seconds
.
.
[info] Fetching movie ratings for interstellar
[debug] Imdb rating for interstellar exists in cache
[debug] RottenTomatoes rating for interstellar exists in cache
[debug] Got Sify rating for interstellar
[debug] Got Netflix rating for interstellar
** (HTTPoison.Error) "Oops! we got a non 200 status code from AmazonPrime" <---- second failure
...
[info] Elixir.Oscar.OscarWorker[ab7f871a-c4ae-4a86-8914-6e94dfde09b3] fail: 1570ms
[info] Queueing job ab7f871a-c4ae-4a86-8914-6e94dfde09b3 to retry in 125.0 seconds
.
.
[info] Fetching movie ratings for interstellar
[debug] Imdb rating for interstellar exists in cache
[debug] RottenTomatoes rating for interstellar exists in cache
[debug] Sify rating for interstellar exists in cache
[debug] Netflix rating for interstellar exists in cache
[debug] Got AmazonPrime rating for interstellar
[info] Saved rating for interstellar
[info] Elixir.Oscar.OscarWorker[ab7f871a-c4ae-4a86-8914-6e94dfde09b3] success: 250ms
iex> {:ok, cached} = Oscar.RedisCache.get(uid)
iex> Jason.decode!(cached, keys: :atoms)
%{
imdb: 4.90,
rt: 4.75,
sify: 4.8,
netflix: 4.85,
amazon_prime: 4.90
}
Using caching was straighforward, and the solution works as intended. Oscar does not call Imdb/RottenTomatoes/Netflix more than once between retries. This helps us preserve the API calls for upcoming jobs instead of exhausting more calls for finishing this particular job.
What excites me more is how easy this is to write in Elixir! The functions are super lean and no if-else blocks to check if we have already cached our progress!
defp get_rating(%{imdb: imdb, uid: uid} = metadata, "imdb", name) when is_float(imdb) do
Logger.debug("Imdb rating for #{name} exists in cache", uid: uid)
metadata
end
defp get_rating(metadata, "imdb", name) when is_float(imdb) do
Map.put(metadata, :imdb, Imdb.get_rating!(name))
end
These two patterns for the get_rating/3
function takes care of fetching data either from cache or API.
Since we store the metadata
(the ratings so far) as json string and decode it back to an Elixir map, it makes it very simple to pattern match against the map keys. If we have a floating point value in the key, this means we have already fetched the rating for that particular portal (in this case, imdb). We can simply return the current metadata
as is without calling the Imdb API.
Furthermore, if this pattern match fails, the default fallback pattern is to fetch the rating from API and we put the value into the same key we look for in the next iteration (if any). 🎉
Ain't that cool! 💜
This is a minimal version of a bigger problem I helped a client solve recently. The app makes around 10-15 external service calls, most of which gets blocked if we start hitting them aggressively. Since the data we query for is the same across a particular job, we can always save the current progress and pick up from where we left off if we crash. But as I mentioned in the beginning, you don't always need Redis or a dedicated background worker with Elixir. You can achieve this very same behaviour with a GenServer! As for me, the app was already using many of the good features of ExQ which needs Redis.