d

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore.

15 St Margarets, NY 10033
(+381) 11 123 4567
ouroffice@aware.com

 

KMF

Rails Asynchronous Processing – DZone Web Dev

When I log into my bank account and want a report of all my account transactions for say, six months or a year, the web application says it received my request and asks me to check later to get the PDF report. After some time, I would be able to download the report. This is an example of asynchronous processing.

In this article, I describe the implementation of a simple asynchronous processing use case in Rails. I have a sample application called “mahrasa,” short for Mahboob Rails sample application, into which I have integrated the code.

Use Case

The user uploads a CSV file to the application. She gets a message saying the file is received and is being processed. A link is displayed where the user can check the status. In the backend, the file is processed asynchronously and post-processing the status is updated on the status page.

Design

Rails has many gems that enable asynchronous processing. Some of them are delayed_job, Resque, sidekiq, and delayed.

I went with delayed as it is the newest kid on the block and states advanced features on its repository page, as given below:

Delayed is a multi-threaded, SQL-driven ActiveJob backend used at Betterment to process millions of background jobs per day. 

It supports postgres, mysql, and sqlite, and is designed to be:

  • Reliable, with co-transactional job enqueues and guaranteed, at-least-once execution

  • Scalable, with an optimized pickup query and concurrent job execution

  • Resilient, with built-in retry mechanisms, exponential back-off, and failed job preservation

  • Maintainable, with robust instrumentation, continuous monitoring, and priority-based alerting

Why Delayed?

The delayed gem is a targeted fork of both delayed_job and delayed_job_active_record, combining them into a single library. It is designed for applications with the kinds of operational needs seen at Betterment, and includes numerous features extracted from Betterment’s codebases, such as:

  • Multithreaded job execution via concurrent-ruby

  • A highly optimized, SKIP LOCKED-based pickup query (on postgres)

  • Built-in instrumentation and continuous monitoring via a new monitor process

  • Named priority ranges, defaulting to :interactive, :user_visible, :eventual, and :reporting

  • Priority-based alerting thresholds for job age, run time, and attempts

  • An experimental autoscaling metric, for use by a horizontal autoscaler (we use Kubernetes)

  • A custom adapter that extends ActiveJob with Delayed-specific behaviors

Let me add a disclaimer here that I haven’t verified all the claims, so this article is not an endorsement that you should go with delayed in your application.

The installation steps of delayed are pretty simple:

  • Add the following to your Gemfile:
  • Run bundle install.
  • Create the table delayed_jobs.
$ rails generate delayed:migration rails db:migrate

  • Add the following line to config/application.rb:
config.active_job.queue_adapter = :delayed 

Inserting data in PostgreSQL with psql is blazingly fast. However, mahrasa uses SQLite3 database. The equivalent to psql in the case is SQLite3 itself. Before coding the job with SQLite3, I decide to check other data insert methods also, and time them to get an understanding of their relative performance. The options are:

  1. Insert the data row by row.
  2. Use csvsql to copy the file into the database.
  3. Bulk insert the rows using ActiveRecord-import.
  4. Use SQLite3 to copy the file into the database.

For each option, I wrote an application job, whose details are given below:

ImportGdcJob

This job is the implementation of option 1. It reads the input CSV file in a loop and for each line, it inserts a row in the database calling the create method of the model Gdc.

ImportGdcJob2

This job is the implementation of option 2. csvsql needs a header line with column names. My CSV file does not have a header row. Therefore, this job first creates a temp.csv file with the first line having column names and then appends the entire input CSV file. It then runs the tool csvsql to copy the file into the database. You can install csvsql in a Python toolkit called csvkit.

ImportGdcJob3

This job is the implementation of option 3. It bulk inserts data with ActiveRecord-import by invoking the input method on the model class Gdc.

ImportGdcJob4

This job is the implementation of option 4. It executes a system statement to run SQLite3 passing it a shell script as input. The shell script creates a temp_table and imports the input CSV file data into it. It then inserts data from the temp_table into global_daily_cumulative. This routing of data via a temp_table takes care of automatic id generation in the primary column, which SQLite3 does not handle.

The procedure for running and testing these jobs is as follows:

  • Run the steps, up to Create Table, in the How to Run section.
  • In ImportGdcJob4.rb, comment on the line:
AsyncOperation.where(id: job.arguments.first[:id]).update(:status => "processed")

  • In one terminal, start rake jobs:
  • In another terminal, start the Rails console and call the job’s class name.
$ rails c 

> ImportGdcJob2.perform_later 

The following screenshots show the output of running ImportGdcJob2 in Terminal 1 and Terminal 2, respectively.

Output of running ImportGdcJob2 in Terminal 1

Output of running ImportGdcJob2 in Terminal 2

Since ImportGdcJob was a line-by-line insert into the database, I knew it would be awfully slow and so I ran it with only 1,000 rows. The execution times were in the expected order:

JobTime (Seconds)

ImportGdcJob

[1000 rows only]

144.53

ImportGdcJob2

58.43

ImportGdcJob3

6.02

ImportGdcJob4

4.91

Integrating Into Rails

Since it is the fastest, the fourth option is the preferred option. The job is called in the controller as an asynchronous operation, as shown in the following code block:

def import
    # copy uploaded file app/jobs directory
    FileUtils.cp(File.new(params[:csvfile].tempfile),
                 "#{Rails.root}/app/jobs/global_daily_cumulative.csv")

    # insert async_operations row
    @filename = params[:csvfile].original_filename
    @ao       = AsyncOperation.new(:op_type => 'Import CSV',
                                   :filename => @filename,
                                   :status => :enqueued)

    # enqueue the job
    if @ao.save
        ImportGdcJob4.perform_later(id: @ao.id)
    end
    
    render :ack
end

The view renders a link to check the status of the data insert job. 

How To Run

$ git clone https://github.com/mh-github/mahrasa.git -b delayed1

  • Go into the project folder:
  • Make sure you have Ruby 3.1.2 installed and use it.
$ rvm install 3.1.2
$ rvm use 3.1.2

$ bin/rails db:migrate RAILS_ENV=development

  • Create table: Execute the following command from the SQLite3 prompt or within a database IDE like SQLite Browser or DBeaver.
CREATE TABLE "global_daily_cumulative" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, "date" TEXT, "place" TEXT, "confirmed" INTEGER, "deaths" INTEGER, "recovered" INTEGER);

  • Uncomment line: If you have commented line #7 ImportGdc4.rb to test the job, uncomment it.
  • Run the server:
  • In another terminal, start rake jobs.
$ cd mahrasa
$ rake delayed:work

  • Access the application in the browser at http://localhost:3000.
  • Click on the link “Upload global_daily_cumulative.csv.”
  • Click the button “Choose File.”
  • In the file explorer, navigate to the folder mahrasa/test and select the file global_daily_cumulative.csv.
  • You will see a message that the file is received and gives a link to check the status of the job. If you click the link you will go to the status page and know the current status of the job.

View when the job is enqueued:

View when the job is enqueued

View after the job is processed:

View after the job is processed

You can check in the database that the CSV file row count is the same as the record count in the table.

sqlite> select count(*) from global_daily_cumulative;

158987

Final Thoughts

The word “delayed” has an unfortunate negative connotation. When I first heard the term “delayed jobs,” I thought that these were slow and inefficient jobs suffering from inefficient code and had to be tuned at the server/database level, even code reviewed. Later on, I realized what they actually were. These were just asynchronously executed objects and the word “delayed” was being used as an adjective because they used the library called “delayed_job.”

Use the available gems and time them to your sample workloads. It may so happen that the speed difference among these is not that critical. For really high volume processing you may have to go to first RabbitMQ and then finally Apache Kafka.

Credit: Source link

Previous Next
Close
Test Caption
Test Description goes like this