Messaging Systems
The Queue is a powerful data structure which forms the foundation of many concurrent design patterns. Often, these design patterns center around passing messages between agents within the concurrent system. We will explore one of the simplest and most useful of these message-based patterns - the so-called “Task Queue”. Later, we may also look at the somewhat related “Publish-Subscribe” pattern (also sometimes referred to as “PubSub”).
Task Queue (or Work Queue)
In a task queue system,
Agents called “producers” write messages to a queue that describe work to be done.
A separate set of agents called “consumers” receive the messages and do the work. While work is being done, no new messages are received by the consumer.
Each message is delivered exactly once to a single consumer to ensure no work is “duplicated”.
Multiple consumers can be processing “work” messages at once, and similarly, 0 consumers can be processing messages at a given time (in which case, messages will simply queue up).
The Task Queue pattern is a good fit for our jobs service.
Our Flask API will play the role of producer.
One or more “worker” programs will play the role of consumer.
Workers will receive messages about new jobs to execute and performing the analysis steps.
Task Queues in Redis
The HotQueue class provides two methods for creating a task queue consumer; the first is the .consume() method
and the second is the q.worker decorator.
The Consume Method
With a q object defined like q = HotQueue("some_queue", host="<Redis_IP>", port=6379, db=1),
the consume method works as follows:
The
q.consume()method returns an iterator which can be looped over using aforloop (much like a list).The
q.consume()method blocks (i.e., waits indefinitely) when there are no additional messages in the queue namedsome_queue.
The basic syntax of the consume method is this:
for item in q.consume():
# do something with item
Exercises. Complete the following:
Start/scale two python debug containers with redis and hotqueue installed (you can use the
jstubbs/redis-clientimage if you prefer). In two separate shells, exec into each debug container and start ipython.In each terminal, create a
HotQueueobject pointing to the same Redis queue.In the first terminal, add three or four Python strings to the queue; check the length of the queue.
In the second terminal, use a
forloop and the.consume()method to print objects in the queue to the screen.Observe that the strings are printed out in the second terminal.
Back in the first terminal, check the length of the queue; add some more objects to the queue.
Confirm the newly added objects are “instantaneously” printed to the screen back in the second terminal.
The q.worker Decorator
Given a Hotqueue queue object, q, the q.worker decorator is a convenience utility to turn a function into a consumer
without having to write the for loop. The basic syntax is:
@q.worker
def do_work(item):
# do something with item
In the example above, item will be populated with the item dequeued.
Then, to start consuming messages, simply call the function:
>>> do_work()
# ... blocks until new messages arrive
Note
The @q.worker decorator replaces the for loop. Once you call a function decorated with @q.worker, the
code never returns unless there is an unhandled exception.
Exercise. Write a function, echo(item), to print an item to the screen, and use the q.worker decorator to
turn it into a consumer. Call your echo function in one terminal and in a separate terminal, send messages to the
redis queue. Verify that the message items are printed to the screen in the first terminal.
In practice, we will use the @q.worker in a Python source file like so –
# A simple example of Python source file, worker.py
q = HotQueue("some_queue", host="<Redis_IP>", port=6379, db=1)
@q.worker
def do_work(item):
# do something with item...
do_work()
Assuming the file above was saved as worker.py, calling python worker.py from the shell would result in a
non-terminating program that “processed” the items in the "some_queue" queue using the do_work(item) function.
The only thing that would cause our worker to stop is an unhandled exception.
Concurrency in the Jobs API
Recall that our big-picture goal is to add a Jobs endpoint to our Flask system that can process long-running tasks. We will implement our Jobs API with concurrency in mind. The goals will be:
Enable analysis jobs that take longer to run than the request/response cycle (typically, a few seconds or less).
Deploy multiple “worker” processes to enable more throughput of jobs.
The overall architecture will thus be:
Save the request in a database and respond to the user that the analysis will eventually be run.
Give the user a unique identifier with which they can check the status of their job and fetch the results when they are ready,
Queue the job to run so that a worker can pick it up and run it.
Build the worker to actually work the job.
Parts a), b) and c) are the tasks of the Flask API, while part d) will be a worker, running as a separate pod/container, that is waiting for new items in the Redis queue.
Code Organization
As software systems get larger, it is very important to keep code organized so that finding the functions, classes, etc. responsible for different behaviors is as easy as possible. To some extent, this is technology-specific, as different languages, frameworks, etc., have different rules and conventions about code organization. We’ll focus on Python, since that is what we are using.
The basic unit of code organization in Python is called a “module”. This is just a Python source file (ends in a .py
extension) with variables, functions, classes, etc., defined in it. We’ve already used a number of modules, including
modules that are part of the Python standard library (e.g. json) and modules that are part of third-party libraries
(e.g., redis).
The following should be kept in mind when designing the modules of a larger system:
Modules should be focused, with specific tasks or functionality in mind, and their names (preferably, short) should match their focus.
Modules are also the most typical entry-point for the Python interpreter itself, (e.g.,
python some_module.py).Accessing code from external modules is accomplished through the
importstatement.Circular imports will cause errors - if module A imports an object from module B, module B cannot import from module A.
Examples. The Python standard library is a good source of examples of module design. You can browse the standard library for Python 3.9 here.
We see the Python standard library has modules focused on a variety of computing tasks; for example, for working with different data types, such as the
datetimemodule and thearraymodule. The descriptions are succinct:
The datetime module supplies classes for manipulating dates and times.
This module defines an object type which can compactly represent an array of basic values: characters, integers, floating point numbers
For working with various file formats: e.g.,
csv,configparserFor working with concurrency:
threading,multiprocessing, etc.
With this in mind, a first approach might be to break up our system into two modules:
api.py- this module contains the flask web server.
worker.py- this module contains the code to execute jobs.
However, both the API server and the workers will need to interact with the database and the queue:
The API will create new jobs in the database, put new jobs onto the queue, and retrieve the status of jobs (and probably the output products of the job).
The worker will pull jobs off the queue, retrieve jobs from the database, and update them.
This suggests a different structure:
api.py- this module contains the flask web server.
jobs.py- this module contains core functionality for working with jobs in Redis (and on the queue).
worker.py- this module contains the code to execute jobs.
Common code for working with redis/hotqueue can go in the jobs.py module and be imported in both api.py
and worker.py.
Note
High-quality modular design is a crucial aspect of building good software. It requires significant thought and experience to do correctly, and when done poorly it can have dire consequences. In the best case, poor module design can make the software difficult to maintain/upgrade; in the worst case, it can prevent it from running correctly at all.
Private vs Public Objects
As software projects grow, the notion of public and private access points (functions, variables, etc.) becomes an increasingly important part of code organization.
Private objects should only be used within the module they are defined. If a developer needs to change the implementation of a private object, she only needs to make sure the changes work within the existing module.
Public objects can be used by external modules. Changes to public objects need more careful analysis to understand the impact across the system.
Like the layout of code itself, this topic is technology-specific. In this class, we will take a simplified approach based on our use of Python. Remember, this is a simplification to illustrate the basic concepts - in practice, more advanced/robust approaches are used.
We will name private objects starting with a single underscore (
_) character.If an object does not start with an underscore, it should be considered public.
We can see public and private objects in use within the standard library as well. If we open up the source code for the
datetime module, which can be found on GitHub we see a mix
of public and private objects and methods.
Private objects are listed first.
Public objects start on line 473 with the
timedeltaclass.
Exercise. Create three files, api.py, worker.py and jobs.py in your local repository, and update
them by working through the following example.
Here are some function and variable definitions, some of which have incomplete implementations and/or have invalid syntax.
To begin, place them in the appropriate files. Also, determine if they should be public or private.
def _generate_jid():
return str(uuid.uuid4())
app = Flask(__name__)
def _generate_job_key(jid):
return 'job.{}'.format(jid)
q = HotQueue("queue", host='172.17.0.1', port=6379, db=1)
def _instantiate_job(jid, status, start, end):
if type(jid) == str:
return {'id': jid,
'status': status,
'start': start,
'end': end
}
return {'id': jid.decode('utf-8'),
'status': status.decode('utf-8'),
'start': start.decode('utf-8'),
'end': end.decode('utf-8')
}
@app.route('/jobs', methods=['POST'])
def jobs_api():
try:
job = request.get_json(force=True)
except Exception as e:
return True, json.dumps({'status': "Error", 'message': 'Invalid JSON: {}.'.format(e)})
return json.dumps(jobs.add_job(job['start'], job['end']))
def _save_job(job_key, job_dict):
"""Save a job object in the Redis database."""
rd.hmset(.......)
def _queue_job(jid):
"""Add a job to the redis queue."""
....
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0')
def add_job(start, end, status="submitted"):
"""Add a job to the redis queue."""
jid = _generate_jid()
job_dict = _instantiate_job(jid, status, start, end)
_save_job(......)
_queue_job(......)
return job_dict
@<...> # fill in
def execute_job(jid):
# fill in ...
rd = redis.StrictRedis(host='172.17.0.1', port=6379, db=0)
def update_job_status(jid, status):
"""Update the status of job with job id `jid` to status `status`."""
jid, status, start, end = rd.hmget(generate_job_key(jid), 'id', 'status', 'start', 'end')
job = _instantiate_job(jid, status, start, end)
if job:
job['status'] = status
_save_job(_generate_job_key(jid), job)
else:
raise Exception()
Solution. We start by recognizing that app = Flask(__name__) is the instantiation of a Flask app, the @app.route
is a flask decorator for defining an endpoint in the API, and the app.run line is used to launch the flask server,
so we add those both in the api.py file:
# api.py
app = Flask(__name__)
@app.route('/jobs', methods=['POST'])
def jobs_api():
try:
job = request.get_json(force=True)
except Exception as e:
return True, json.dumps({'status': "Error", 'message': 'Invalid JSON: {}.'.format(e)})
return json.dumps(jobs.add_job(job['start'], job['end']))
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0')
We also recognize several functions are private by the leading _ in the name. They are:
_generate_jid
_generate_job_key
_instantiate_job
_save_job
_queue_job
These all have to do with jobs and are used (either directly or indirectly) by the add_job function. One more hint
is that the jobs_api() function, which we just put in api.py, actually references jobs.add_job, so we can
put these in the jobs.py file:
# jobs.py
def _generate_jid():
return str(uuid.uuid4())
def _generate_job_key(jid):
return 'job.{}'.format(jid)
def _instantiate_job(jid, status, start, end):
if type(jid) == str:
return {'id': jid,
'status': status,
'start': start,
'end': end
}
return {'id': jid.decode('utf-8'),
'status': status.decode('utf-8'),
'start': start.decode('utf-8'),
'end': end.decode('utf-8')
}
def _save_job(job_key, job_dict):
"""Save a job object in the Redis database."""
rd.hmset(.......)
def _queue_job(jid):
"""Add a job to the redis queue."""
....
def add_job(start, end, status="submitted"):
"""Add a job to the redis queue."""
jid = _generate_jid()
job_dict = _instantiate_job(jid, status, start, end)
_save_job(......)
_queue_job(......)
return job_dict
That leaves the definition of the q = HotQueue(..), rd = StrictRedis(..), update_job_status() and execute_job().
We know
worker.pyis responsible for actually executing the job, soexecute_jobshould go there.The
update_job_status()is a jobs-related task, so it goes in thejobs.pyfile – it also makes a call to_instantiate_jobwhich is already injobs.py.The jobs.py file definitely needs access to the
rdobject so that goes there.Lastly, the
qwill be needed by bothjobs.pyandworker.py, butworker.pyis already importing fromjobs, so we better put it injobs.pyas well.
Therefore, the final solution is:
# api.py
app = Flask(__name__)
@app.route('/jobs', methods=['POST'])
def jobs_api():
try:
job = request.get_json(force=True)
except Exception as e:
return True, json.dumps({'status': "Error", 'message': 'Invalid JSON: {}.'.format(e)})
return json.dumps(jobs.add_job(job['start'], job['end']))
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0')
# jobs.py
q = HotQueue("queue", host='172.17.0.1', port=6379, db=1)
rd = redis.StrictRedis(host='172.17.0.1', port=6379, db=0)
def _generate_jid():
return str(uuid.uuid4())
def _generate_job_key(jid):
return 'job.{}'.format(jid)
def _instantiate_job(jid, status, start, end):
if type(jid) == str:
return {'id': jid,
'status': status,
'start': start,
'end': end
}
return {'id': jid.decode('utf-8'),
'status': status.decode('utf-8'),
'start': start.decode('utf-8'),
'end': end.decode('utf-8')
}
def _save_job(job_key, job_dict):
"""Save a job object in the Redis database."""
rd.hmset(.......)
def _queue_job(jid):
"""Add a job to the redis queue."""
....
def add_job(start, end, status="submitted"):
"""Add a job to the redis queue."""
jid = _generate_jid()
job_dict = _instantiate_job(jid, status, start, end)
_save_job(......)
_queue_job(......)
return job_dict
def update_job_status(jid, status):
"""Update the status of job with job id `jid` to status `status`."""
jid, status, start, end = rd.hmget(generate_job_key(jid), 'id', 'status', 'start', 'end')
job = _instantiate_job(jid, status, start, end)
if job:
job['status'] = status
_save_job(_generate_job_key(jid), job)
else:
raise Exception()
# worker.py
@<...> # fill in
def execute_job(jid):
# fill in ...
Exercise. After placing the functions in the correct files, add the necessary import statements.
Solution. Let’s start with api.py. We know we need to import the Flask class to create the app object and to
use the flask request object. We also use the json package from the standard library. Finally, we are using our
own jobs module.
# api.py
import json
from flask import Flask, request
import jobs
app = Flask(__name__)
@app.route('/jobs', methods=['POST'])
def jobs_api():
try:
job = request.get_json(force=True)
except Exception as e:
return True, json.dumps({'status': "Error", 'message': 'Invalid JSON: {}.'.format(e)})
return json.dumps(jobs.add_job(job['start'], job['end']))
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0')
For jobs.py, there is nothing from our own code to import (which is good since the other modules will be importing
from it, but we do need to import the StrictRedis and HotQueue classes. Also, don’t forget the use of the
uuid module from the standard lib! So, jobs.py becomes:
# jobs.py
import uuid
from hotqueue import HotQueue
from redis import StrictRedis
q = HotQueue("queue", host='172.17.0.1', port=6379, db=1)
rd = StrictRedis(host='172.17.0.1', port=6379, db=0)
def _generate_jid():
return str(uuid.uuid4())
def _generate_job_key(jid):
return 'job.{}'.format(jid)
def _instantiate_job(jid, status, start, end):
if type(jid) == str:
return {'id': jid,
'status': status,
'start': start,
'end': end
}
return {'id': jid.decode('utf-8'),
'status': status.decode('utf-8'),
'start': start.decode('utf-8'),
'end': end.decode('utf-8')
}
def _save_job(job_key, job_dict):
"""Save a job object in the Redis database."""
rd.hmset(.......)
def _queue_job(jid):
"""Add a job to the redis queue."""
....
def add_job(start, end, status="submitted"):
"""Add a job to the redis queue."""
jid = _generate_jid()
job_dict = _instantiate_job(jid, status, start, end)
_save_job(......)
_queue_job(......)
return job_dict
def update_job_status(jid, status):
"""Update the status of job with job id `jid` to status `status`."""
jid, status, start, end = rd.hmget(generate_job_key(jid), 'id', 'status', 'start', 'end')
job = _instantiate_job(jid, status, start, end)
if job:
job['status'] = status
_save_job(_generate_job_key(jid), job)
else:
raise Exception()
Finally, on the surface it doesn’t appear that the worker needs to import anything, but we know it needs the q
object to get items. It’s hidden by the missing decorator. Let’s go ahead and import it:
# worker.py
from jobs import q
@<...> # fill in
def execute_job(jid):
# fill in ...
Exercise. Write code to finish the implementations for _save_job and _queue_job.
Solution. The _save_job function should save the job to the database, while the _queue_job function
should put it on the queue. We know how to write those:
def _save_job(job_key, job_dict):
"""Save a job object in the Redis database."""
rd.hmset(job_key, job_dict)
def _queue_job(jid):
"""Add a job to the redis queue."""
q.put(jid)
Exercise. Fix the calls to _save_job and execute_job within the add_job function.
Solution. The issue in each of these are the missing parameters. The _save_job takes job_key, job_dict, so
we just need to pass those in. Similarly, _queue_job takes jid, so we pass that in. The add_job function
thus becomes:
def add_job(start, end, status="submitted"):
"""Add a job to the redis queue."""
jid = _generate_jid()
job_dict = _instantiate_job(jid, status, start, end)
# update call to save_job:
save_job(_generate_job_key(jid), job_dict)
# update call to queue_job:
queue_job(jid)
return job_dict
Exercise. Finish the execute_job function. This function needs a decorator (which one?)
and it needs a function body.
The function body needs to:
update the status at the start (to something like “in progress”).
update the status when finished (to something like “complete”).
For the body, we will use the following (incomplete) simplification:
update_job_status(jid, .....)
# todo -- replace with real job.
time.sleep(15)
update_job_status(jid, .....)
Solution.
As discussed before, we saw in class we can use the q.worker decorator to turn the worker into a consumer.
As for execute_job itself, we are given the body, we just need to fix the calls to the update_job_status()
function. The first call puts the job “in progress” while the second sets it to “complete”. So the function becomes:
@<...> # fill in
def execute_job(jid):
update_job_status(jid, "in progress")
time.sleep(15)
update_job_status(jid, "complete")
Note that we are using the update_job_status function from jobs.py now, so we need to import it.
The final worker.py is thus:
from jobs import q, update_job_status
@q.worker
def execute_job(jid):
jobs.update_job_status(jid, 'in progress')
time.sleep(15)
jobs.update_job_status(jid, 'complete')