Computers, Software

Moving Data in AWS

I have been a long time user of Amazon Web Services, but I only recently started using the Data Pipeline service they offer to handle ETL needs. The service provides a cloud ready, low cost, turn key (in some cases) solution to moving data within your services. I had a difficult time getting up and running, partly due to the lack of discussion online about this service, so I want to share my experience, offer some best practices, walk through how I developed our pipelines.

Anyone with an AWS account can use Data Pipelines. But be careful there there is no BAA agreement if you are in the healthcare industry and are passing around PHI data. Fortunately, our needs do not yet require us to move around PHI information.

The first step in my ETL journey was first formalizing what data needed to go where. Before I even opened up the Data Pipeline service, I had to understand our specific needs. I identified two use cases.

Archiving Old Data

RDS instances have a max allowable size for the database and ours was getting full. The approach I took was to look at our largest tables, and focus on reducing those first. I ran some queries to understand what is using the most space: http://stackoverflow.com/a/9620273/802407

Once I had a list of my largest tables, I could classify them and assign retention rules. Some tables I decided to leave in place, and others I decided were transient data, and could be archived. (HIPAA mandates a 7 year data retention policy, so no luck just deleting). We decided as a business that different classifications could live within our application for differing time frames. Once timeframes were established, I could then write a data pipeline, and move any data older than our cut off date for that table to a storage solution outside of the database. We chose to house MySQL backups on S3 in encrypted buckets.

Fortunately the Data Pipeline service provides a number of templates to help you get started. Navigate to https://console.aws.amazon.com/datapipeline .  I found the templates good starting point, but there are some frustrations that I will mention below in the “Quirks” section. Click “Create Pipeline”. I used the template “Full Copy of RDS MySQL table to S3”. I filled in the parameters, and edited the pipeline in “Edit in Architect” mode.

Since I wanted to archive old data, I modified the SqlDataNode’s Select Query to be only records older than my retention policy:

select * from #{table} 
where #{myRDSTableLastModifiedCol} <= 
date_sub('#{format(@scheduledStartTime, 'YYYY-MM-dd HH-mm-ss')}' , 
  interval 6 month)

This will select records only older than 6 months from the pipeline scheduled start time. The template then moves these to S3. There are two parameters, defined in the parameters section for “#{myRDSTableName}”, and “#{myRDSTableLastModifiedCol}”. I supplied my table name, and the updated_at datetime column for my records.

I added a new SqlActivity dependent on the CopyActivity named “Delete old records”. Once they move to S3, I want to delete them from the database table. This activity “Depends on: RDSToS3CopyActivity” so if saving to S3 fails, the records are left untouched. I added the following script to mirror my select query above, but deleting the records instead:

delete from #{myRDSTableName} 
where #{myRDSTableLastModifiedCol} <= 
date_sub('#{format(@scheduledStartTime, 'YYYY-MM-dd HH-mm-ss')}' , 
  interval 6 month)

I would recommend doing this on a test database first before you delete production records while testing your script. Because the timestamp is the same as from the select, this will be the same record set, provided you have an updated_at column that shows when a record was last touched.

Expose Data for Reporting

My other use case was getting data to the reporting server which is in Redshift. Again, there is a nice template to get started. Click “Create Pipeline” and then select “Incremental copy of RDS MySQL table to Redshift”, then “Edit in Architect”.

The first run will need to be a full copy if I want all the data in the table. After that, I can use delta copies to only move over the new data fitting your criteria. This is driven by a SQL select query, so it is easy to modify. In the SqlDataNode I can edit the Select Query to my liking. Note that I removed the conditions from the query to get all records on the first run. I changed the instance type to something more substantial (t1.micro to m1.small), and upped the timeout from 2 hours (to 8 hours). I then went in before the next run and modified the query to put back the conditions that selected the delta data, then downgraded the instance type, and timeout values to their smaller defaults.

I then ran into an infrastructure quirk where our Redshift instance was inside a VPC, and our RDS database was inside a classic instance (non-VPC). This meant that the same EC2 instance would not be able to talk to both databases since it had to be on one side or the other. Because of this limitation, I had to modify parts of the pipeline that assumed a single EC2 instance would be talking to both databases. Note that I had to edit the JSON as the Data Pipeline UI does not allow changing the resources that activities run on from the template. I created two EC2 instances – one for talking to RDS and S3, and one for talking to S3 and Redshift.

In an attempt to make things easier, Amazon provides some Python scripts that get called under the hood to reflect on your MySQL table structure and convert it to a PostgreSQL CREATE TABLE command. This didn’t work for me because of my VPC permissions issues, so I provided my own CREATE TABLE Sql in the S3StagingDataNode. This was generated using the Python script by Amazon, but I supplied the inputs manually:

#!/bin/bash
aws s3 cp s3://datapipeline-us-east-1/sample-scripts/mysql_to_redshift.py \
/tmp/mysql_to_redshift.py

for target in table1 table2 table3
do
  mysqldump -h <host> --port=3306 -u <username> --password=<password>  \
--compatible=postgresql --default-character-set=utf8 -n -d \
-r "/tmp/${target}_structure.sql" <database> $target

  python /tmp/mysql_to_redshift.py --input_file=/tmp/${target}_structure.sql \
--output_file=/tmp/${target}_create.psql --table_name=$target \
--insert_mode=KEEP_EXISTING
done

This Bash script will pull down the public file mysql_to_redshift.py. Then it loops over the target tables you want to setup pipelines for. For each table (table1, table2, table3, etc) it does a mysqldump of the table structure. It then feeds this table structure file into the python conversion utility to produce the PostgreSQL version of the table structure. The contents of the table1_create.psql file is what I copied into my “Create Table Sql” field in the Data Pipeline UI.

Note that the “Create Table SQL” is interpreted literally, and has no schema context in Redshift. Therefor if I want to create the database table in another schema, the CREATE TABLE contents need to be modified to prepend this table name with a schema qualifier. e.g. “table1” would become “staging.table” (without the quotes). The Python utility will double quote the table name if given a table name with a period. This will incorrectly create a table in the public schema: public.”staging.table”, which is probably not what desired. Check the contents of the CREATE TABLE for accuracy.

I also changed the “Insert Mode” in the S3ToRedshiftCopyActivity” to be OVERWRITE_EXISTING . This uses the primary key of the table to detect duplicate rows. Since we might modify existing records, I wanted to replace those records in Redshift when they are modified in the application.

Quirks

The data pipeline services has a number of quirks that I stumbled upon. I hope Amazon works to refine the service, and that one day these are no longer issues. But for now I observed the following:

  • I cannot add everything via the UI. Things like parameters, and EC2 resources can only be added via editing the JSON. Don’t be afraid to open it up – it is reasonably easy to follow. Hopefully support to add these objects will come to the UI in the future.
  • The default templates are a great place to understand the pipelines, but are very messy. The JSON definitions are painful to manage. Some information are stored in parameters, others are done inline. Some activities cannot have parameterized variables. Sometimes the parameter names MUST start with “my”, e.g. “myDatabaseUsername”. I found this arbitrary and frustrating. Also some parameters have a “watermark” value, a “help” value, others don’t. At least one variable started with a “*” character. No explanation why.
  • When testing a pipeline I cannot change between a scheduled pipeline, and an on demand pipeline. I have to export the JSON definition and create a new pipeline as the other type. This makes testing painful.
  • The “Execution Details” screen is hard to interpret. The filter defaults to “Activities” only, but all of my pipeline definitions start with an EC2 resource being instantiated which is filtered out. The timeframes are also strange. I needed to change the “Schedule Interval” to be “Executed start” while testing an on demand pipeline. Finally the dates need changing from the default. It will default to 2 days ago, and will include a lot of previous test runs if you are developing. They don’t seem to be sorted in any logical way either, making tracing the pipeline execution difficult at a glance.
  • While debugging, check S3 for the logs. I found log data was contained in S3 that was not referenced at all in the Data Pipeline UI. This was critical for understanding failures.
  • The visualization in the Architect mode is not particularly helpful. The only thing I can do is click on a node and see the properties on the right. I cannot view two node’s properties at once. Worse is the parameters are in a completely different section so I can only see the variable name, or the variable value at any time. I found it only useful to see the flow of execution. For editing, I would export the data pipeline to JSON and modify in a text editor outside of the UI.

Conclusion

The Data Pipeline service offers an affordable way to automate ETLs in a cloud friendly way. The proof of concept that I did under represented the amount of work it would take to get these running in production. I frequently battled with permissions, UI quirks, and timeouts. I was persistent, and eventually got these pipelines running. One running, I stripped out the passwords and committed the JSON exports into version control for peace of mind.

I want to look into the SNS failure reporting next to be alerted when pipelines fail. I am most concerned with upstream changes to the table schemas. If these don’t match the downstream schema, the pipelines will fail. To guard against this I wrote unit tests that inspect the tables and ensure no changes have been introduced upstream. If a change is introduced, the test fails with the message that it was expecting something else. This should trigger the developer to alert the data services team to modify the pipeline downstream to prepare for upstream changes. This policy has not yet been tested.

Hardware, Linux, Open-source, Ruby, Software, Uncategorized

Delayed Job Performance Tuning

We found a bug. A bug that affected a lot of historical records that we now have the pleasure of reprocessing. Fortunately we already had an async job infrastructure in place with the delayed job gem. Unfortunately, this gem is intended for fairly small batches of records and isn’t tuned to handle 1M+ records in the delayed_jobs table.

After reading some best practices we decided on a queue based approach. To keep our day to day async jobs running we would use a “default” queue. And to reprocess our old records we used a new queue. Starting the workers up with a “–queue” flag did the trick. We had hardware dedicated for day-to-day operations, and new hardware dedicated to our new queue operations. Now it was simply a matter of filling up the queue with the jobs to reprocess the records.

Our initial approach maxed out the CPU on our database server. This was largely due to us not tuning our SQL in our async jobs. Because the volume we processed was always low, this was never really a noticeable problem. But when we threw lots of new jobs into the queues, it became very noticeable. The workers would start up, then mysteriously die. After some digging in /var/log/kern.log we discovered the workers were being killed due to an out of memory manager. Attaching a small swap partition helped, but once you’ve hit swap, things become horribly slow. Whats the point in keeping the worker alive if its running a thousand times slower? Clearly we needed to profile and tune. So we did. (The specifics are out of scope for this article, but it involved consolidating N+1 queries and limiting columns returned by the SELECT).

With our newly tuned SQL our spirits were high as we cranked up the workers again. Only to reach the next process bottleneck. And this is where databases gets frustrating. Delayed job workers run a query each time they are looking for a new job to find out which job to pick up. It puts a mutex lock on the record by setting locked_at and locked_by. The query looks like this:

UPDATE delayed_jobs
SET `delayed_jobs`.`locked_at` = '2016-06-05 11:48:28', 
 `delayed_jobs`.`locked_by` = 'delayed_job.2 host:ip-10-203-174-216 pid:3226' 
WHERE ((run_at <= '2016-06-05 11:48:28' 
AND (locked_at IS NULL OR locked_at < '2016-06-05 07:48:28') OR locked_by = 'delayed_job.2 host:ip-10-203-174-216 pid:3226') 
AND failed_at IS NULL) 
ORDER BY priority ASC, run_at ASC 
LIMIT 1;

The UPDATE does an ORDER which results in a filesort. Filesorts are typically something an index can resolve. So I optimistically added the following:

CREATE INDEX delayed_job_priority
ON delayed_jobs(priority,run_at);

Sadly, this index was completely ignored when I ran an EXPLAIN on my UPDATE. And the reason is that MySQL doesn’t execute an UPDATE query the same way as if you did a SELECT with the same conditions. The index probably made things worse, because now with each record update, we now also have an index update as well. I could probably fork the code and probably use some type of isolation level in a transaction to get the best of both worlds with an index based SELECT, and a quick UPDATE on a single record by id. But there are easier solutions to try first.

My UPDATE statements were pushing 40 seconds in some cases according to MySQL. Eventually the lock wait timeout is exceeded and you see an error in the delayed_jobs.log:

Error while reserving job: Mysql2::Error: Lock wait timeout exceeded; 
try restarting transaction

Jobs were moving very slowly, and throwing more workers at it didn’t make an improvement. This is because each time a worker picks up a job, it was waiting 40+ seconds. The UPDATE was doing a filesort, and any index was being ignored. (And MySQL doesn’t support UPDATE hints). It was pretty clear that all of the jobs from the reprocessing queue needed to find a new home that didn’t negatively impact my filesort. I settled on the following solution:

CREATE TABLE delayed_jobs_backup LIKE delayed_jobs;

INSERT INTO delayed_jobs_backup
SELECT * FROM delayed_jobs WHERE queue='new_queue';

DELETE FROM delayed_jobs WHERE queue='new_queue';

This creates a new database table with the structure of the existing delayed_jobs table. The table is then populated with the jobs that needed to find a new home (All 1M+ of them). And finally, deleted from the original delayed_jobs table. Be careful doing this, and do some SELECT/EXPLAIN queries in between to ensure you are doing what you think you are doing. (Deleting 1M+ records from a production database makes me sit up in my chair a bit).

Looking at MySQL’s process list I no longer have System locks on my UPDATE statements (presumably because the table size is small enough that the filesort is mostly painless):

mysql> SHOW FULL PROCESSLIST;
# Id, User, Host, db, Command, Time, State, Info
1, user, 0.0.0.0:34346, localhost, Query, 0, System lock, UPDATE ...

The important columns here are the Time (in seconds), State, and Info. This proves that my job locking was happening quickly. I was seeing Time values of 40+ seconds before. I kept referring back to this process list to verify that the UPDATES were remaining snappy while I modified the number of workers running, and the number of jobs in the queue. I had a goal of keeping the UPDATE system lock times under 2 seconds. Adding more workers pushed the times up. Adding more jobs to the queue pushed the times up. Its a balance that probably depends very much on what you are processing, how much your database can handle, and what your memory constraints are on your worker servers.

To conclude – my job over the next few days will be to run the following command to put some jobs into the delayed_jobs table 10,000 at a time:

INSERT INTO delayed_jobs
SELECT * FROM delayed_jobs_backup LIMIT 10000;

DELETE FROM delayed_jobs_backup LIMIT 10000;

You can of course automate this. But my objective was met. The jobs can reprocess old records without impacting day to day jobs in the default queue. When the delayed_jobs table is almost empty, I move over another batch of jobs from the delayed_jobs_backup table. Rinse and repeat until there are no more jobs left to process. Its a bit more painful, but day to day operations continue to function, and I can cross of the reprocessing task from my list of things to do. All without any code changes!

I’ve been reading up on transaction isolation levels thinking something like a SELECT FOR UPDATE lock might be worthy contribution to the delayed_job codebase: http://dev.mysql.com/doc/refman/5.7/en/innodb-transaction-isolation-levels.html

Open-source, Software, Web

Inserting Large Data Sets in MySQL

Its always interesting for me to work with large data sets. The solutions that work in lower orders of magnitude don’t always scale, and I am left with unusable solutions in production. Often the problems require clever refactoring that at a cursory glance appear identical, but somehow skirt around some expensive operation.

I had a requirement to tune a script that was responsible for inserting 300k records in a database table. The implemented solution of iterating through a collection and calling ‘INSERT’ was not scaling very well and the operation was taking long enough to time out in some runs. This gave me the opportunity to learn about a few things in MySQL including the profiler, and (spoiler!) the INSERT multiple records syntax.

I needed some real numbers to compare the changes I would be making. My plan was to change one thing at a time and run a benchmark to tell if the performance was 1) better 2) worse, or 3) not impacted. MySQL has an easy to use profiler for getting this information. Inside of the MySQL CLI, you can issue the command:

SET profiling=1;

Any subsequent queries you run will now be profiled. You can see a listing of queries you want to know more about by typing:

SHOW profiles;

This command will show an index of queries that have run, along with their associated Query_ID. To view more information about a particular query, issue the following command replacing x with the Query_ID:

SHOW profile FOR QUERY x

Here is an example output:

+------------------------------+----------+
| Status                       | Duration |
+------------------------------+----------+
| starting                     | 0.000094 |
| checking permissions         | 0.000003 |
| checking permissions         | 0.000002 |
| checking permissions         | 0.000001 |
| checking permissions         | 0.000003 |
| Opening tables               | 0.000021 |
| System lock                  | 0.000008 |
| init                         | 0.000039 |
| optimizing                   | 0.000012 |
| statistics                   | 0.000717 |
| preparing                    | 0.000014 |
| Creating tmp table           | 0.000023 |
| executing                    | 0.000002 |
| Copying to tmp table         | 0.016192 |
| converting HEAP to MyISAM    | 0.026860 |
| Copying to tmp table on disk | 2.491668 |
| Sorting result               | 0.269554 |
| Sending data                 | 0.001139 |
| end                          | 0.000003 |
| removing tmp table           | 0.066401 |
| end                          | 0.000009 |
| query end                    | 0.000005 |
| closing tables               | 0.000011 |
| freeing items                | 0.000040 |
| logging slow query           | 0.000002 |
| cleaning up                  | 0.000015 |
+------------------------------+----------+

In one iteration of my SQL query, I was spending an excessive amount of time “copying to tmp table”. After reading the article http://www.dbtuna.com/article/55/Copying_to_tmp_table_-_MySQL_thread_states, I was able to isolate the cause of this to an ORDER clause in my query that wasn’t strictly necessary. In this example, Not too much exciting is going on, which is a Good Thing.

For a comprehensive listing of thread states listed in the Status column, view: http://dev.mysql.com/doc/refman/5.0/en/general-thread-states.html

Now that I know my query is as optimized as it can be, its time to pull out the bigger guns. On to plan B – consolidating those INSERT statements!

An INSERT statement, though executing seemingly instantaneously under small loads is comprised of many smaller operations, each with its own cost. The expense of these operations is roughly the following: (http://dev.mysql.com/doc/refman/5.0/en/insert-speed.html)

  • Connecting: (3)
  • Sending query to server: (2)
  • Parsing query: (2)
  • Inserting row: (1 × size of row)
  • Inserting indexes: (1 × number of indexes)
  • Closing: (1)

As you can see, connecting to the server, sending the query, and parsing are relatively expensive operations. In the script I was modifying, 300k INSERT statements were generating 300k records. Fortunately for us, MySQL doesn’t force our records to be 1:1 with our INSERT statements thanks to allowing multiple insertions per INSERT. To use this feature instead of having 3 INSERT statements:

INSERT INTO foo (col1, col2) VALUES (1, 1);
INSERT INTO foo (col1, col2) VALUES (2, 2);
INSERT INTO foo (col1, col2) VALUES (3, 3);

We can instead coalesce them into a single INSERT statement

INSERT INTO foo (col1, col2) VALUES (1, 1), (2, 2), (3, 3);

How many values can we coalesce into the same INSERT statement? This isn’t driven by a max number of records, but rather a server system variable sysvar_bulk_insert_buffer_size: http://dev.mysql.com/doc/refman/5.0/en/server-system-variables.html#sysvar_bulk_insert_buffer_size This can be modified, but the default is 8388608 bytes. The exact number of records will vary depending on the number of columns, and the amount of data being inserted into those columns. I conservatively chose to coalesce 5k records at a time. I tried to bump this to 10k, but I encountered an exception when I exceeded this server system variable maximum.

With my INSERTS coalesced, I was able to reduce my total number of INSERT statements to 60 (300k / 5k). This yielded massive performance boosts. I was able to take the query from over an hour to run to completing in just 2 minutes. Quite a nice trick, considering the data is unchanged.

Is there room for improvement? Absolutely. A statement executed 60 times may be worth preparing, or wrapping inside of a transactional block. My real world tests didn’t yield a significant enough performance boost to make these complexities worth implementing. This may not be true with data in higher orders of magnitude, or different schema layouts. MySQL also understands INDEX hints, which allow you to suggest INDEXES that may be missed by the query planner, or force the inclusion or exclusion of beneficial, or detrimental INDEXES despite what the query planner thinks! (http://dev.mysql.com/doc/refman/5.0/en/index-hints.html)

Speaking of INDEX, if any are using UNIQUE, BTREE type, these can be dropped while the mass INSERT is running, then added back later to side-step the 1n INDEX operational hit.

In the next order of magnitude, I will probably have to rethink my approach of using INSERT statements to load data. According to the MySQL documentation, LOAD DATA INFILE is “roughly 20 times faster” than a MySQL INSERT statement. My script would no longer generate statements, but rather output to a file in a comma delimited format. This could then be loaded assuming appropriate permissions are in place.

Happy profiling!

Computers, Events, Hardware, Linux, Personal, Software, Windows

Back from Catastrophe

What a horrid last few days. Around lunch yesterday I restarted my Windows box at work, and I was greeted with unpleasantness. At the login screen, I got an error informing me that some instruction in memory has performed an illegal operation. After clicking on that, I got the infamous “NT Authority” says you have 60 more seconds with Windows error.

On rebooting the machine, I was notified that “C:WindowsSystem32hal.dll” is missing. I called shins, but sure enough, my entire Windows directory appeared to be empty. On closer inspection, Ubuntu informed me that it wasn’t empty, but instead, it was receiving an Input/Output error when trying to “ls” the contents. I ran a “chkdsk /r” wholly expecting that NTFS has fucked it all up again, and I seem to have been correct – at least in part.

After the chkdsk, I advanced two seconds further than my last attempt to boot Windows, only to be greeted with some cryptic error informing me that my registry looks about like that train over there. Repairing was not an option, so after much fingernail biting, and a few choice words, I decided that my only remaining option was a reinstallation.

Let me take a moment here to talk about the Windows reload process. My problem isn’t that I think its crappy and that I think someone should do something about it. I actually know that its crappy compared to any other Operating System’s standards. I can’t tell you how many damn “Next” buttons I had to click. And then how many preferences I had to change. This would have been much easier if I could have used something like Synaptic to check all the programs I wanted to install in one swoop. Additionally, on a *NIX platform, all of my application preferences would have been saved under my home folder. Windows is a tard in that department so it took me about five hours to get it back to usable.

After that got resolved I fired up my Virtual Machine containing my webserver (cheap hosting solution I know) and found that the MySQL database wouldn’t start. It ended up that the filesystem on my Linux box was corrupted as well. I ran “fsck” and fixed a dozen or so errors, rebooted, and realized that one of the files that was corrupted happened to be the MySQL user’s table. Long story short, I learned alot about troubleshooting MySQL, and got everything restored without losing any data.

Now I am finding other files all over the place that are 0 bytes in size. I have backups, but since the original file still exists when the backup is made, the backup is successfully overwritten with the new (0 kb) file.

John (and I partly) suspect VMWare may be the culprit. This is an incomplete theory however, and the entire process has left me visibly shaken. We run financial systems on these things. We run nuclear power plants with these things. My net worth is just a number sitting on some hard drive in a basement Wachovia owns somewhere. What happens when that dissappears?

Computers, Linux, Open-source, Software, Web, Windows

Ubuntu: connect to MySQL from remote host

Just a quick gotcha that I always stumble upon in Ubuntu. By default, the MySQL database is set only to listen to requests on the localhost (for security reasons). If you want to connect to the database remotely, say using MySQL Admin GUI Tools, you will need to make a quick change.

First run this to edit the MySQL config file with root permissions:

sudo vim /etc/mysql/my.cnf

Next, look for the property “bind-address” (press “/” in VIM to start a search). In Ubuntu server 7.04 it is on line 47 but this is total subject to change. Instead of it reading 127.0.0.1, change it to the static IP of the server. In most *nix distributions, this can be found by running “ifconfig”. My server configuration line now looks like this:

bind-address = 168.28.245.99

All that remains is to give the MySQL database a quick restart to apply the change:

sudo /etc/init.d/mysql restart