Refreshing Big Type 2 Dimension: ETL vs ELT

By Sergey Alentyev – Sr. Database Architect – ADP Cobalt Business Intelligence Platform

There are many new tools that have arrived for working with Big Data. But each of them usually covers just some particular functionality. Relational databases are still the main destination for Data Warehouses. And before we look for the new magic tool we should think about what we can do to improve and scale the current relational database. In this article I’ll try to review only one piece of this big area: how can you handle very big dimensions in your data warehouse when the data is growing very fast. Specifically, I will review a Type 2 dimension where we keep different versions of dimension data for different time periods and effective status. This “slowly changing dimensions” can change really fast and grow fast. The Type I would be a simple subset of this task. The other types are less commonly used but still can use a similar technique.

Why this is important? Because this is one of the most common cases where the ETL performance degrades fast when data grows fast. Throwing more hardware resources can help only temporarily and sometimes doesn’t help at all. Then, such a situation can bring somebody to a conclusion (very often a wrong conclusion) that our database system doesn’t scale and can not handle really big data.

For the test case I am going to use Oracle as a database and Informatica as an ETL tool. But this approach can be applied to any relational database and ETL tool.

Basic situation. Using ETL tool to refresh dimension and fact

One of the most common big type 2 dimensions is Consumer. Some organizations can have millions and tens of millions records there. Another example is Web Clients dimensions where we need to keep all details about web clients: full name, operating system, operating system versions, web client versions, spider name, spider version, consumer or robotic flag, etc. It can grow to tens of millions rows also. There are other cases and you probably can add your own example of big dimensions.

For the sake of simplicity we will use one hypothetical dimension table, one fact table and one staging table that keeps changes for the last refreshing time period (day, hour, minute, etc.).

CREATE TABLE DIM_SCD2
( DIM_KEY      NUMBER(10)          NOT NULL,
  NATURAL      VARCHAR2(100)       NOT NULL,
  DETAILS      VARCHAR2(100)       NOT NULL,
  BEGIN_DATE   DATE                NOT NULL,
  END_DATE     DATE                )
TABLESPACE WCA_APP_DATA
STORAGE (INITIAL 1M NEXT 1M)
;

where
DIM_KEY – primary key (we create constraint and index when we load sample data)
NATURAL – the column that represents the natural key identifying the dimension record. We have just one column for the natural key to keep our test simple. Usually there are several columns that represent the natural key. For example, for Consumer: first name, last name, address, phone number, etc.
DETAILS – this column represents all of the potential details about the dimension records that we care about. If some detail has been changed then we need to update the records. For example, Consumer marriage status or Web Client consumer flag (we found our that web client is a robot and change the flag from consumer to non-consumer). Again, we have just one column for simplicity.
BEGIN_DATE and END_DATE show the effective period of the record.

CREATE TABLE FACT
( DIM_KEY      NUMBER(10)          NOT NULL,
  METRIC       NUMBER(10)          NOT NULL )
TABLESPACE WCA_APP_DATA
STORAGE (INITIAL 1M NEXT 1M)
;

In this test our interest about the FACT table is only from the perspective of how do we look up dimension data by natural key to generate DIM_KEY values. This is why the FACT table design is so simple. In a real fact you usually need more dimensions and metrics.

CREATE TABLE STAGE
( NATURAL      VARCHAR2(100)        NOT NULL,
  DETAILS      VARCHAR2(100)       NOT NULL)  
TABLESPACE WCA_APP_DATA
STORAGE (INITIAL 1M NEXT 1M )
;

We use the staging table only as a source for our tests. How data has appeared there is out of scope for this research.

Let’s suppose that we have just recently launched our data warehouse system and our dimension is relatively small (100,000 records) and processing volumes are also very modest (10,000 records in STAGE).

Let’s load the sample set of the data into DIM and STAGE. The FACT table will be empty before the test. We assume we always insert into FACT. The other cases are out of scope for this test.

insert into dim_scd2
with generator as (
    select rownum     id
    from    all_objects 
    where    rownum <= 1000
)
select
    rownum+1000                       dim_key,
    to_char(rownum)||rpad('x',80,'x') natural,
    to_char(rownum+100)               details,
    sysdate - 1000                    begin_date,
    null                              end_date
from
    generator    g1,
    generator    g2
where
  rownum <= 100000
;
commit;

CREATE UNIQUE INDEX DIM_PK ON DIM_SCD2(DIM_KEY)
TABLESPACE WCA_APP_IDX
STORAGE (INITIAL 1M NEXT 1M)
;

ALTER TABLE DIM_SCD2 ADD (
  PRIMARY KEY (DIM_KEY)
  USING INDEX DIM_PK)
;

We assume that during each ETL run (each day, hour, minute) the STAGE table data will have 90% of existing (by natural key NATURAL) records in DIM and 10% will be new records.

insert into stage
-- existing DIM records
select natural, decode(mod(rownum,2),0, details, details||'*')
from dim_scd2
where rownum <= 9000
union all
-- new DIM records
select
    to_char(rownum+100000)||rpad('x',80,'x') natural,
    to_char(rownum+100100)                   details
from all_objects
where rownum <= 1000;
commit;

Also, we pretend that among existing records 50% have been changed in their details (decode(mod(rownum,2),0, details, details||’*’)). This percentage is very different for different dimensions and data warehouse systems but we just need to show some updates in our test.

Now, when we have a sample data set, let’s create a sample ETL process that refreshes the data warehouse. We are going to review two parts of it: refreshing the dimension itself and refreshing the fact using this dimension. As a default we use a third party ETL tool with a separate ETL server for data transformation and processing. As I mentioned before we use Informatica.

The first part of our workflow is for dimension refresh. It reads the staging table and, using the natural key, it looks for DIM_KEY values. If it is found and DETAILS in the stage is different from dimension we update existing record with new DETAILS, keeping other columns the same. Of course, for Type 2 dimensions update really means update an old version of a record and insert a new version of a record. If we can’t find the match by natural key we insert new record with sequence generated DIM_KEY and BEGIN_DATA and END_DATE based on certain rules. One of the possible ways to develop related mapping is shown in Picture 1 as iconic view without many implementation details.

Chart

Picture 1. m_dim_scd2 mapping

The second part is for the fact refresh. Again, we use the fact refresh here only to illustrate how the dimension is used. So, for our test case the related mapping is even more simple. We read STAGE, looking up for DIM_KEY using natural key NATURAL (it must be there because we just have run the dimension refresh based on the same staging table) and insert DIM_KEY and “1” for METRIC column into new records. That’s it.

m_fact mapping

Picture 2. m_fact mapping

Our testing workflow also looks very simple.

wf_dim_fact workflow

Picture 3. wf_dim_fact workflow

It runs almost instantly…

Baseline workflow wf_dim_fact run

Picture 4. Baseline workflow wf_dim_fact run

and updates 4,500 records in DIM_SCD2, insert 5,500 records in DIM_SCD2 (4,500 inserts for type 2 update and 1,000 insert of totally new records) and inserts 10,000 records in FACT.

But what will happen when we start to process much more data and the dimension starts to grow very significantly?

Let’s say data has grown by 300 times and we process 3 millions records daily, and now the dimension has 30 million records. These are practically real numbers from when we faced a real performance problem in our company and had to do something about it.

New setup script for bigger data set would look like this:

TRUNCATE TABLE STAGE;
TRUNCATE TABLE DIM_SCD2;
ALTER TABLE DIM_SCD2 DROP PRIMARY KEY DROP INDEX;

insert into dim_scd2
with generator as (
    select rownum     id
    from    all_objects 
    where    rownum <= 10000
)
select
    rownum+1000                       dim_key,
    to_char(rownum)||rpad('x',80,'x') natural,
    to_char(rownum+100)               details,
    sysdate - 1000                    begin_date,
    sysdate + 1000                    end_date
from
    generator    g1,
    generator    g2
where
  rownum <= 30000000
;

CREATE UNIQUE INDEX DIM_PK ON DIM_SCD2(DIM_KEY)
TABLESPACE WCA_APP_IDX
STORAGE (INITIAL 1M NEXT 1M)
;

ALTER TABLE DIM_SCD2 ADD (
  PRIMARY KEY (DIM_KEY)
  USING INDEX DIM_PK)
;

insert into stage
with generator as (
    select rownum     id
    from    all_objects 
    where    rownum <= 10000
)
-- existing DIM recotds
select natural, decode(mod(rownum,2),0, details, details||'*')
from dim_scd2
where rownum <= 2700000
union all
-- new DIM recotds
select
    to_char(rownum+30000000)||rpad('x',80,'x') natural,
    to_char(rownum+30000100)                   details
from
    generator    g1,
    generator    g2
where rownum <= 300000;
commit;

And we run the workflow again.

Baseline workflow wf_dim_fact run with big data set

Picture 5. Baseline workflow wf_dim_fact run with big data set

Now it runs almost 28 minutes. It updates 1,350,000 records in DIM_SCD2, inserts 1,650.000 records in DIM_SCD2 (1,350,000 inserts for type 2 update and 300,000 insert of totally new records) and inserts 3,000,000 records in FACT.

From Informatica session logs we can see that just the creation of first session lookup files (data file is almost 7GB and index file is more than 7GB) took about 9 minutes. For second session lookup files, creation took almost 5 minutes (data file is less then 1GB and index file is more than 7GB). So, we spend a lot of time just building look up structures.

Creation on index for NATURAL column doesn’t help much because Informatica creates its own lookups and indexes it on its own server.

Probably for some people 24 minutes does not look like a very long time, but if we assume that data will continue to grow and we need to run ETL several times a day then this is a problem.

Improving Dimension refresh

The first thing that we can do is to improve the dimension refresh itself. Would it run faster if we add more memory, CPUs, faster disks on ETL server? Not much. Like if during Battle of Thermopylae Persians could add more troops to its already giant army would they be more successful against Spartans? Probably not, because they could still only use the limited small number of soldiers at the same time. (You can read more about historical military analogies for different ETL situation at https://collectivegenius.wordpress.com/2012/04/24/winning-etl-battles/.)
What if we change the tactic completely and will not try to percolate through the narrow Thermopylae pass (ETL row-by-row processing) but will fight on the open field as Macedonian phalanx (SQL data set processing)?

We will refresh our dimension in three bulk steps. First, we insert new versions of changed records in the new stage table STAGE1. Then we MERGE into dimension doing update of changed records that become old versions of these records and inserting brand new records. Last step is to insert everything from STAGE1 into dimension. The reason for having an extra staging table is to make MERGE logic more straightforward and efficient.

Bulk Dimension Refresh

Picture 6. Bulk Dimension Refresh

We would need a new database sequence to use it instead of Informatica sequence.

CREATE TABLE STAGE1
( NATURAL      VARCHAR2(100)        NOT NULL,
  DETAILS      VARCHAR2(100)       NOT NULL)
TABLESPACE WCA_APP_DATA
STORAGE (INITIAL 1M NEXT 1M)
;
truncate table stage1;

DECLARE
  max_key VARCHAR2(10);
BEGIN
  select to_char(max(dim_key))
  into max_key
  from dim_scd2;

  EXECUTE IMMEDIATE
   'CREATE SEQUENCE dim_scd2_seq INCREMENT BY 1 START WITH '
                     ||max_key;
END;
/

We put all three SQLs in the stored procedure DIM_SCD2_REFRESH and replace the complicated mapping m_dim_scd2 with a very simple mapping that just calls this stored procedure.

CREATE OR REPLACE PROCEDURE dim_scd2_refresh AS
BEGIN
    -- 1 --Inserting changed records into another staging table
    -- that can be used later (3) to insert into DIM_SCD2
    INSERT INTO STAGE1
     (natural,
      details)
    SELECT
      s.natural,
      s.details
    FROM  STAGE  s,
          DIM_SCD2 t
    WHERE s.natural = t.natural
      AND s.details  t.details
    ;

    -- 2 -- Updating changed records (type 2)
    -- and inserting new records.
    MERGE INTO dim_scd2 t
    USING (SELECT DISTINCT natural,
                           details
           FROM STAGE) s
    ON (    s.natural = t.natural)
    WHEN MATCHED THEN
      UPDATE SET end_date = sysdate
      WHERE t.begin_date <= sysdate 
        AND t.end_date IS NULL
        AND s.details  t.details
    WHEN NOT MATCHED THEN
      INSERT (dim_key,
              natural,
              details,
              begin_date,
              end_date)
       VALUES (dim_scd2_seq.NEXTVAL,
               s.natural,
               s.details,
               to_date('01/01/1970','mm/dd/yyyy'),
               NULL) 
    ;

    -- 3 -- Inserting changed records (type 2) into dimension.
    INSERT INTO dim_scd2
             (dim_key,
              natural,
              details,
              begin_date,
              end_date)
    SELECT    dim_scd2_seq.NEXTVAL,
              s.natural,
              s.details,
              sysdate,
              NULL
    FROM stage1 s;    
    COMMIT;
END;
/

This change gives us more than three times improvement for dimension refresh session. And we can go even further to use parallelism and partitioning for STAGE table and probably DIM_SCD2 also.

New workflow wf_dim_sp_fact run with optimized dimension refresh

Picture 7. New workflow wf_dim_sp_fact run with optimized dimension refresh

Improving Fact refresh. Join instead of LookUp

Now let us see how can we improve our dimension lookup during the fact table load. From the Informatica session log we can see that a majority of all run time was spent just for building lookup files. And then we process the data row-by-row again looking for the keys in lookup files. Can we change this approach to data set processing as we did in the previous case? Yes we can. We can just join staging table with dimension and insert the results in the fact table. All actions are in one bulk operation. In real case scenario we would need to join with several dimension tables putting intermediate results into another staging tables. Couple staging tables is enough. We can reuse them in cyclic fashion truncating and reloading one after another. So, lets create the new session s_m_fact_join_dim_sp that will use the new mapping m_fact_join_dim_sp that will call the stored procedure fact_join_dim_refresh.

CREATE OR REPLACE PROCEDURE fact_join_dim_refresh AS
BEGIN

    INSERT INTO FACT
      (dim_key,
       metric)
    SELECT
      NVL(d.dim_key,-1),
      1
    FROM  STAGE  s
          left outer join
          DIM_SCD2 d 
          on s.natural = d.natural
    WHERE END_DATE IS NULL
    ;
END;
/
New workflow wf_dim_fact_join_dim_sp run with join instead of lookup

Picture 8. New workflow wf_dim_fact_join_dim_sp run with join instead of lookup

The Fact load time is reduced in two times.

Dimension Cache

We are discussing how to efficiently refresh a really big dimension. But if during each ETL run (daily, hourly, etc.) we touch only a small portion of dimension data why do we have to build a lookup for all giant dimension or join with all giant dimension? In our test case we touch only 10% of the data. In real case it can be much less.

What if we can create an ETL runtime cache of our dimension? Cache will include only records that were updated or inserted during dimension refresh. And we can use only these records for fact refresh lookup or join.

We will adjust our DIM_SCD2 refresh stored procedure to include CACHE_SCD2 refresh on the way.

First we replace STAGE1 with STAGE2 we where keep staging data and dimension data.

CREATE TABLE STAGE2
( S_NATURAL    VARCHAR2(100)       NOT NULL,
  S_DETAILS    VARCHAR2(100)       NOT NULL,
  DIM_KEY      NUMBER(10)          ,
  NATURAL      VARCHAR2(100)       ,
  DETAILS      VARCHAR2(100)       ,
  BEGIN_DATE   DATE                ,
  END_DATE     DATE                )
TABLESPACE WCA_APP_DATA
STORAGE (INITIAL 1M NEXT 1M)
;

CACHE_SCD2 table will look exactly like DIM_SCD2:

CREATE TABLE CACHE_SCD2
( DIM_KEY      NUMBER(10)          NOT NULL,
  NATURAL      VARCHAR2(100)        NOT NULL,
  DETAILS      VARCHAR2(100)       NOT NULL,
  BEGIN_DATE   DATE                NOT NULL,
  END_DATE     DATE                )
TABLESPACE WCA_APP_DATA
STORAGE (INITIAL 1M NEXT 1M)
;

New stored procedure for DIM_SCD2 and CACHE_SCD2 refresh:

CREATE OR REPLACE PROCEDURE dim_scd2_cache_refresh AS
  dim_key_border NUMBER(10);
BEGIN
    -- 1 -- Outer join STAGE and DIM_SCD2 and inserti into STAGE2
    -- where dimension data is missing it is new records
    -- where dimension data is present we use dim_key to unsert in CACHE_DIM

  execute immediate 'TRUNCATE TABLE STAGE2';
  INSERT INTO STAGE2
  SELECT s.NATURAL,
       s.DETAILS,
       d.DIM_KEY,
       d.NATURAL,
       d.DETAILS,
       d.BEGIN_DATE,
       d.END_DATE
  FROM stage s,
       dim_scd2 d
  WHERE s.NATURAL = D.NATURAL(+);

    -- 2 -- Updating changed records (type 2)
    MERGE INTO dim_scd2 t
    USING (SELECT DISTINCT s_natural,
                           s_details
           FROM STAGE2
           WHERE dim_key IS NOT NULL) s
    ON (    s.s_natural = t.natural)
    WHEN MATCHED THEN
      UPDATE SET end_date = sysdate
      WHERE t.begin_date <= sysdate 
        AND t.end_date IS NULL
        AND s.s_details  t.details
    ;
    -- 3 -- Inserting unchanged records into cache.
  execute immediate 'TRUNCATE TABLE CACHE_SCD2';    
    INSERT INTO cache_scd2
             (dim_key,
              natural,
              details,
              begin_date,
              end_date)
    SELECT    dim_key,
              natural,
              details,
              begin_date,
              end_date
    FROM stage2
    WHERE natural is not null
      AND s_details = details;    

    -- 4 --Save the curent DIM_KEY sequence value
    SELECT dim_scd2_seq.NEXTVAL
    INTO dim_key_border
    FROM dual; 

    -- 5 -- Inserting changed records into cache.
    INSERT INTO cache_scd2
             (dim_key,
              natural,
              details,
              begin_date,
              end_date)
    SELECT    dim_scd2_seq.NEXTVAL,
              natural,
              details,
              sysdate,
              NULL
    FROM stage2
    WHERE natural is not null
      AND s_details  details;         

    -- 6 -- Inserting new records into cache.
    INSERT INTO cache_scd2
             (dim_key,
              natural,
              details,
              begin_date,
              end_date)
    SELECT    dim_scd2_seq.NEXTVAL,
              s_natural,
              s_details,
              to_date('01/01/1970','mm/dd/yyyy'),
              NULL
    FROM stage2
    WHERE natural is null; 

   -- 7 -- Insert changed and new records into DIM_SCD2.
    INSERT INTO dim_scd2   
             (dim_key,
              natural,
              details,
              begin_date,
              end_date)
    SELECT dim_key,
           natural,
           details,
           begin_date,
           end_date 
    FROM cache_scd2
    WHERE dim_key > dim_key_border
    ;     
    COMMIT;
END;
/
Process model for Dimension and Cache Refresh

Picture 9. Process model for Dimension and Cache Refresh

We still keep bulk pace and run pretty fast.

Fact refresh. LookUp with Cache

Now we will test our ETL, calling the new stored procedure DIM_SCD2_CACHE_REFRESH in the first session and in the second session looking up CACHE_SCD2 instead of DIM_SCD2.

Workflow wf_dim_fact_lookup_cahce run with lookup against cache

Picture 10. Workflow wf_dim_fact_lookup_cahce run with lookup against cache

The Fact is loaded three times faster than with the original version.

Improving Fact refresh. Join with Cache

Last test would be calling the new stored procedure DIM_SCD2_CACHE_REFRESH in the first session and join with CACHE_SCD2 instead of DIM_SCD2 in the second session.

Second session will call next stored procedure:

CREATE OR REPLACE PROCEDURE SALENTYEV.fact_join_cache_refresh AS
BEGIN

    INSERT INTO FACT
      (dim_key,
       metric)
    SELECT
      NVL(d.dim_key,-1),
      1
    FROM  STAGE  s
          left outer join
          CACHE_SCD2 d 
          on s.natural = d.natural
    WHERE END_DATE IS NULL
    ;
END;
/
Workflow wf_dim_fact_join_dim_sp run with cache join

Picture 11. Workflow wf_dim_fact_join_dim_sp run with cache join

This version loads the Fact three times faster than the original version.

Comparison and Conclusion

Let’s compare all our results.

Release Dimension Fact Total
Basic ETL run 27:03 5:46 32:51
Dimension ELT + Dimension Lookup during the fact load 12:04 6:24 18:28
Dimension ELT + Dimension Join during the fact load 13:19 3:26 16:47
Dimension ELT + Cache Lookup during the fact load 13:37 2:11 15:50
Dimension ELT + Cache Join during the fact load 12:48 1:02 13:53

Theoretically all “Dimension ELT” should show the same time but each run is slightly different even though I tried to run it during the silent time on an Oracle database and Informatica server.

For this testing I used a decent ETL server but a pretty modest database server. Nevertheless our workflow run faster in times. On real production deployment cases we had up to 5X improvement for dimension refresh and about 10X faster runs for fact refresh.

The main point in all our improvements is to run bulk SQL on a database server. It is faster but, more importantly, it is scaling better than the traditional ETL approach. Such a method can be used for any batch Data Warehouse Refresh. It doesn’t matter if you update once a day or once a minute. For streaming or message type of Data Warehouse updates other approaches should be used, but that is a subject for other discussions.

Winning ETL battles

By Sergey Alentyev – Senior Database Architect – Cobalt Intelligence Center of Excellence

By the end of another long day of ETL project development I feel pretty tired. Looking at my ETL tool mapping, I don’t see Sources, Transformations, LookUps and Targets connected by many arrows. My imagination shows me the historical battlefield plan with troops locations, phalanxes, fortresses, cavalry attacks.

Obviously I need some rest.

But this picture didn’t pop up without any reason. As military commander maneuvers right parts of his troops at the right time, destroys enemy’s forces, occupies his positions, fortress or city, ETL Architect or Developer moves and processes a lot of data from the sources to the stage, fact and dimension targets. In both cases this is all about efficient utilization of existing resources at the right time. Lets try to find analogies in military history that can help us better understand such resource utilization. Probably we will be able to make some useful conclusions.  Sounds crazy? Lets see.

Battle of Thermopylae

A plan for ETL battle

A plan for ETL battle

We have powerful database server, very strong ETL server, plenty of memory and CPUs. Our ETL mappings and workflows are written by very experienced developers. But why it takes so long time to run? First, we ship all these massive data from source database to ETL server. On ETL server we process all our data record by record.  Or in little better case we run few parallel threads where each thread also processes the data row by row. And finally, we ship all results back to the target database which is the same as the source database. No matter how much more memory and CPU we would have we won’t scale. It reminds me of something…

480 BC. Persian Emperor Xerxes had amassed a huge army to set out to conquer all of Greece. The Athenian general Themistocles had proposed that the allied Greeks block the advance of the Persian army at the pass of Thermopylae. Relatively small army of Spartans and other Greeks leads by king Leonidas blocked the narrow pass. You might have seen the movie.

The decisive moment.

The Decisive Moment (look familiar?)
(Image from http://en.wikipedia.org/wiki/File:Battle_issus_decisive.gif)

If Persian army meets the Greeks at wide place they would smash them very quickly just because they significantly outnumbered them (vary significantly by different historians as Persians: 70K to more than one million warriors, Greeks: 5K – 7K).  For two days all Persian attacks ended with nothing but Persian casualties. On third day a traitor shows Persians the secret pass, and the Greeks were surrounded. We know the end by books and movies.

Besides the unquestionable Greeks courage, military skills and love to the Motherland there was another reason for initial failure of Persian army: well-chosen terrain. Persians had a long journey to the battlefield. They could not use massive force of their army. They could not process in parallel, so to speak. The pass was 100 meters wide at most. They had to process almost in serial mode.

If we can’t utilize the resources available to us at the right moment we can lose out ETL battle. Or at least not win.

Macedonian phalanx

When I see ETL tool row-by-row processing, or INSERT… VALUES… SQL script, or some row-by-row looping like in PL/SQL:

DECLARE
cursor c is
select …
from …
where …;
BEGIN
FOR r IN c LOOP
INSERT INTO target_test VALUES(r…);
END LOOP;
END;

I imagine how two enemy troops are standing against each other on the battlefield. One side starts the attack – one starting to run towards the enemy line with a battle cry, then another, then another. Looks like a not very powerful attack.

PL/SQL and ETL Tools Bulk processing is better and should make a big difference.
Like PL/SQL bulk INSERT:

DECLARE
cursor c is
select …
from …
where …;
TYPE t IS TABLE OF c%ROWTYPE;
l t;
BEGIN
OPEN c;
LOOP
FETCH c BULK COLLECT INTO l LIMIT 1000;
FORALL i IN 1..l.COUNT
INSERT INTO target_test VALUES l(i);
COMMIT;
EXIT WHEN c%NOTFOUND;
END LOOP;
END;
/

We send 1000 warriors together to attack. Scary picture! But when they arrive close to the enemy line they attack again one by one, even at a faster pace.

INSERT … SELECT … will be more effective and faster, closer to the situation where at least first line attack at the same time. INSERT … SELECT … in parallel is the situation when many warriors attack together.

When I see a massive parallel processing of huge partitioned table (something like:

INSERT /*+ append parallel(t) */ INTO target_table t
SELECT /*+ parallel(s) */ …
FROM source_partitioned_table s
WHERE …; )

…it resembles a Macedonian phalanx, probably the first known usage of parallelism. Each phalangite carried his primary weapon – a sarissa, a double-pointed pike over 6 m (18 ft) in length. The weapons of the first five rows (partitions?) of men all projected beyond the front of the formation. Five men can fight at the same time. The parallel degree is 5! We can rewrite our pseudo-query like that:

INSERT /*+ append parallel(t 5) */ INTO target_table t
SELECT /*+ parallel(s 5) */ …
FROM source_partitioned_table
WHERE …;

Phalanx

The Phalanx – A Powerhouse Parallel Strategy
Image from http://room162c.edublogs.org/2010/04/19/battle-of-thermopylae/

Besides courage, military and political talent of Alexander the Great the Macedonian phalanx was one of the main reasons why this young man conquered the known world.  It was very effective usage of limited resources.  Even his troops were usually smaller then his enemy’s army they could apply more force concurrently. They ran in parallel!

Napoleon on the Borodino field

I’m trying to improve performance for big aggregation ETL processes where we doing a lot of I/O, joins and sorts.  We have a lot of CPU and fast disks, we added bunch of memory. At this time there is nobody but our big query running on the database  instance.

But I see that we’re not using this big memory.  All parameters show that we should be able to use much more. And we dump intermediate results at temporary segments and then read it by one or two block at a time. Why are we not allowed to use more of the resources of our superb database?

Back to history!

1812. The French army (best army in the world for that time) supported by many European troops and leaded by emperor Napoleon (possibly the best commander of all times) invaded Russia. For several months Russians were avoiding the decisive battle fighting locally and exhausting French army. Napoleon was thirsting for such a battle. He wanted to destroy the whole enemy forces in one huge battle as he did many times before using his brilliant military talent. Finally September 7, 1812 the biggest and bloodiest single-day action by that time happen.

250,000 troops was involved from both sides, with at least 70,000 casualties. Napoleon’s generals saw  victory close at hand and thought that a final push would be enough to decide the fate of the Russian army and of the war. They begged Napoleon to deploy his final and best reserve – the Imperial Guard. But during the battle, Napoleon saw how hard the Russians fought and how hard his progress had been won. Napoleon changed his mind.

“I will most definitely not; I do not want to have it blown up. I am certain of winning the battle without its intervention.”

He refused to commit the Guard as a final reserve so far from France and reliable logistics supports. What if the Russians found other resources and attacked tomorrow?

In the end, both sides claimed victory. French took empty burned Moscow but soon left, without provisions, beaten from all sides by Russians and a bitter winter.

Everybody who knows something about history knows the final result. From half a million French and other Europeans army invaded Russia only about 22,000 crossed the border back.

But why am I telling this story now? Because Napoleon at Borodino reminds me of our best-in-the-world database management system (that was initially designed for OLTP) as it behaves when it comes to huge queries where a lot of resource needs to be used, where we need to do a lot of I/O, joins, sorts, etc. We have a lot of memory but we are not using it all or even a big portion of it automatically. We need to beg Napoleon – our database – to allocate it using manual memory allocation, hidden parameters and other tricks. But our OLTP based database says I can’t allocate such huge resources to the one battle (query, report, ETL process). What if other sessions will need these resources a second later? “I was always successful with such OLTP approach in my previous campaigns and now I’m going to win this battle anyway even without using my best reserves.”

Sound familiar? “I will most definitely not; I do not want to have it blown up. I am certain of winning the battle without its intervention.”

Winning the guerilla war

To win the major battles we switch from our brilliant but OLTP-based database to one of the best MPP databases. Everything that we were struggling with before (all huge analytics queries, big ETL scans, joins and sorts) is flying now. But guess what, all small queries, online reports, everything that select and process small narrow data sets run longer. Even after tuning SQL for MPP the average small query run much slower then before. Instead of doing just a few I/O it has to send jobs to all segments, most likely do full tables scans and do it with physical reading, then probably do some kind of data shipment between segments (servers). So, we have to pay a minimum MPP price for any operation.

Imagine, you are the Roman general who just recently won the big battle against Rome’s rival. The enemy’s troops are destroyed, the major cities are occupied by Roman forces. But the rebels’ small units are still fighting for their Motherland. They pick at your troops daily, with very small units. And each time you respond by sending several heavy centurions to destroy each group of rebels. And each time, you win the individual battle.

But how much time and resources do you spend? When many gangs attack simultaneously you send all your reserves, leaving cities unprotected. At such moments they can take the cities even with relatively small forces. And you can lose everything that was taken with such giant efforts.

Would it be better to have special light cavalry small units that can do the job much faster and with fewer resources?

What happens with our MPP super system when we need to run online Web applications with hundreds and thousands really small queries running concurrently? Can we quickly exhaust our I/O, CPU and memory? Probably we would need to have some data marts on OLTP based databases for such applications.

No doubt that the future of Data Warehouses is a Big Data and the future of ETL is a Big Processing of Big Data. For Big Data Processing we need to have Big Resources. But even when we have it we need to utilize it right.

I hope this small article can help ETL and Data Warehouse Developers to better visualize the ETL process, its bulk nature in new era and understand the importance of right resource utilization a little bit better. I hope this helps you marshal your forces and win your own ETL battles in the future.

Continuous Integration in a Data Warehouse

By Chris Mills – Development Manager – Cobalt Intelligence Center of Excellence

Over the last two years, we have almost tripled the number of developers working on Intelligence applications at Cobalt, gone from 20 to over 70 production releases per year, removed our QA team, and improved production quality from roughly one “must fix” production issue per month to one every six months.  All of this in the face of scalability challenges that come with rapidly increasing data volumes and processing complexity.  Read on if you’d like to know more about how we’ve done this.

Cobalt’s engineering organization made a transition from “waterfall” to Agile development in 2009.  At the time, testing of warehouse and reporting applications was an expensive process — highly manual, and error-prone.  Often the testing phase of a warehouse project would take more time than the design and implementation phases.

There were plenty of disadvantages to this approach, including:

  • Inconsistent testing from release to release. The quality the testing was influenced by the quality of documentation and handoffs from team to team, as well as human factors like system knowledge and attention to detail.
  • All testing, even routine testing, was expensive because it was so manual and because it depended on coordination and communication across teams and environments.
  • Testing was late in the process.   By the time issues were found, developers were often days away from making their changes.  Another round of handoffs and deployments from Dev to QA was required before issues could be resolved.

 

Cobalt’s transition to Agile was driven by a desire to provide our customers with incremental releases and improved support for the fast-paced worlds of online commerce and digital advertising.  Monolithic releases were out and smaller more frequent releases, often with short lead times, were in.   The current testing approach was clearly incompatible with this, so the team began to pursue automation strategy.

The system we have developed for Continuous Integration of all Database and ETL changes relies on a variety of technologies:  Anthill for nightly builds,  Ant for orchestration, SQLUnit for unit testing, and Informatica web services for remotely launching workflows.  Test suites are managed via Ant scripts, which orchestrate the following tasks for each ETL workflow:

  1. Set up the testing environment with seed data.
  2. Ensure that any system-level preconditions for the ETL being tested are met.
  3. Execute the ETL
  4. Execute a series of unit tests
  5. Cleanup the environment so that any data changes made as a result of these steps are removed.

 

A central application (Anthill) controls the scheduling of test runs, and provides an online reporting interface where results can be reviewed.  A history of test runs is also maintained.  Test results are also delivered to the development team via email, and the team treats automated testing failures as the top priority in its daily work.   At any one time the team will have multiple warehouse releases in flight, each of which gets its own Continuous Integration test runs set up in Anthill.

At the time of this writing, more than 10,000 tests are run under automation against various versions of Cobalt’s BI codebase.  Database level tests confirm that DB structures, indexes, grants, and other objects are appropriate after DB deployments.  ETL tests confirm that processing rules and policies are enforced, and that dependencies between ETLs are accounted for.  In the spirit of Test Driven Development, new tests are added to the suite early in new projects rather than after coding is complete.

This automated testing “safety net” has enabled a number of major changes for the Intelligence product team, all of which have had a direct and very positive business impact.  Our ability to execute thousands of tests against any change in an hour or two has shortened project turnaround time dramatically.   Developers have an easy way to get near real time feedback on the impact of their changes, which has improved their efficiency.  Production quality has improved through more test coverage, and because executing the tests via software ensures consistency.

Finally, testing has moved far enough upstream in the development process that the need for a separate testing team has been removed.  Headcount that used to be allocated towards a “QA” are now fully devoted to Intelligence roadmap development.

We are now approximately two years into our automated testing initiative.  The successes enjoyed by the DB/ETL team from Continuous Integration have spread to the rest of Cobalt’s Intelligence product stack.   Team culture has evolved to the point that testing is an initial consideration for any new work, rather than an after-thought.  We continue to learn and refine, but the initial project goals of improving quality and team velocity have been achieved.   Our team of Intelligence developers did over 70 production releases last year.  Even though we no longer have a separate QA team, our production quality is higher than ever.

In an era of “big data” and increasingly complex and prominent BI applications, the ability to rapidly evolve a data warehouse is more important than ever.  The solution here demonstrates not only that robust automated testing possible in a BI environment, but also that would bring similarly large business impacts to other organizations that follow a similar approach.

While automated testing is commonplace in the software world we have found it to be quite rare in the data warehousing world.  Heavy system integration, large data volumes, and the variety of technologies in a typical BI environment pose special challenges.  We believe that the degree to which we have automated our testing process is unique, and something that other organizations seeking to improve quality and the pace of their BI development could learn from.

If you would like to share your own experiences with test automation in a warehouse setting, or if you’d like more detail on the above, please comment and we’ll get the conversation going!