Refreshing Big Type 2 Dimension: ETL vs ELT
March 19, 2013 1 Comment
By Sergey Alentyev – Sr. Database Architect – ADP Cobalt Business Intelligence Platform
There are many new tools that have arrived for working with Big Data. But each of them usually covers just some particular functionality. Relational databases are still the main destination for Data Warehouses. And before we look for the new magic tool we should think about what we can do to improve and scale the current relational database. In this article I’ll try to review only one piece of this big area: how can you handle very big dimensions in your data warehouse when the data is growing very fast. Specifically, I will review a Type 2 dimension where we keep different versions of dimension data for different time periods and effective status. This “slowly changing dimensions” can change really fast and grow fast. The Type I would be a simple subset of this task. The other types are less commonly used but still can use a similar technique.
Why this is important? Because this is one of the most common cases where the ETL performance degrades fast when data grows fast. Throwing more hardware resources can help only temporarily and sometimes doesn’t help at all. Then, such a situation can bring somebody to a conclusion (very often a wrong conclusion) that our database system doesn’t scale and can not handle really big data.
For the test case I am going to use Oracle as a database and Informatica as an ETL tool. But this approach can be applied to any relational database and ETL tool.
Basic situation. Using ETL tool to refresh dimension and fact
One of the most common big type 2 dimensions is Consumer. Some organizations can have millions and tens of millions records there. Another example is Web Clients dimensions where we need to keep all details about web clients: full name, operating system, operating system versions, web client versions, spider name, spider version, consumer or robotic flag, etc. It can grow to tens of millions rows also. There are other cases and you probably can add your own example of big dimensions.
For the sake of simplicity we will use one hypothetical dimension table, one fact table and one staging table that keeps changes for the last refreshing time period (day, hour, minute, etc.).
CREATE TABLE DIM_SCD2 ( DIM_KEY NUMBER(10) NOT NULL, NATURAL VARCHAR2(100) NOT NULL, DETAILS VARCHAR2(100) NOT NULL, BEGIN_DATE DATE NOT NULL, END_DATE DATE ) TABLESPACE WCA_APP_DATA STORAGE (INITIAL 1M NEXT 1M) ;
where
DIM_KEY – primary key (we create constraint and index when we load sample data)
NATURAL – the column that represents the natural key identifying the dimension record. We have just one column for the natural key to keep our test simple. Usually there are several columns that represent the natural key. For example, for Consumer: first name, last name, address, phone number, etc.
DETAILS – this column represents all of the potential details about the dimension records that we care about. If some detail has been changed then we need to update the records. For example, Consumer marriage status or Web Client consumer flag (we found our that web client is a robot and change the flag from consumer to non-consumer). Again, we have just one column for simplicity.
BEGIN_DATE and END_DATE show the effective period of the record.
CREATE TABLE FACT ( DIM_KEY NUMBER(10) NOT NULL, METRIC NUMBER(10) NOT NULL ) TABLESPACE WCA_APP_DATA STORAGE (INITIAL 1M NEXT 1M) ;
In this test our interest about the FACT table is only from the perspective of how do we look up dimension data by natural key to generate DIM_KEY values. This is why the FACT table design is so simple. In a real fact you usually need more dimensions and metrics.
CREATE TABLE STAGE ( NATURAL VARCHAR2(100) NOT NULL, DETAILS VARCHAR2(100) NOT NULL) TABLESPACE WCA_APP_DATA STORAGE (INITIAL 1M NEXT 1M ) ;
We use the staging table only as a source for our tests. How data has appeared there is out of scope for this research.
Let’s suppose that we have just recently launched our data warehouse system and our dimension is relatively small (100,000 records) and processing volumes are also very modest (10,000 records in STAGE).
Let’s load the sample set of the data into DIM and STAGE. The FACT table will be empty before the test. We assume we always insert into FACT. The other cases are out of scope for this test.
insert into dim_scd2 with generator as ( select rownum id from all_objects where rownum <= 1000 ) select rownum+1000 dim_key, to_char(rownum)||rpad('x',80,'x') natural, to_char(rownum+100) details, sysdate - 1000 begin_date, null end_date from generator g1, generator g2 where rownum <= 100000 ; commit; CREATE UNIQUE INDEX DIM_PK ON DIM_SCD2(DIM_KEY) TABLESPACE WCA_APP_IDX STORAGE (INITIAL 1M NEXT 1M) ; ALTER TABLE DIM_SCD2 ADD ( PRIMARY KEY (DIM_KEY) USING INDEX DIM_PK) ;
We assume that during each ETL run (each day, hour, minute) the STAGE table data will have 90% of existing (by natural key NATURAL) records in DIM and 10% will be new records.
insert into stage -- existing DIM records select natural, decode(mod(rownum,2),0, details, details||'*') from dim_scd2 where rownum <= 9000 union all -- new DIM records select to_char(rownum+100000)||rpad('x',80,'x') natural, to_char(rownum+100100) details from all_objects where rownum <= 1000; commit;
Also, we pretend that among existing records 50% have been changed in their details (decode(mod(rownum,2),0, details, details||’*’)). This percentage is very different for different dimensions and data warehouse systems but we just need to show some updates in our test.
Now, when we have a sample data set, let’s create a sample ETL process that refreshes the data warehouse. We are going to review two parts of it: refreshing the dimension itself and refreshing the fact using this dimension. As a default we use a third party ETL tool with a separate ETL server for data transformation and processing. As I mentioned before we use Informatica.
The first part of our workflow is for dimension refresh. It reads the staging table and, using the natural key, it looks for DIM_KEY values. If it is found and DETAILS in the stage is different from dimension we update existing record with new DETAILS, keeping other columns the same. Of course, for Type 2 dimensions update really means update an old version of a record and insert a new version of a record. If we can’t find the match by natural key we insert new record with sequence generated DIM_KEY and BEGIN_DATA and END_DATE based on certain rules. One of the possible ways to develop related mapping is shown in Picture 1 as iconic view without many implementation details.
The second part is for the fact refresh. Again, we use the fact refresh here only to illustrate how the dimension is used. So, for our test case the related mapping is even more simple. We read STAGE, looking up for DIM_KEY using natural key NATURAL (it must be there because we just have run the dimension refresh based on the same staging table) and insert DIM_KEY and “1” for METRIC column into new records. That’s it.
Our testing workflow also looks very simple.
It runs almost instantly…
and updates 4,500 records in DIM_SCD2, insert 5,500 records in DIM_SCD2 (4,500 inserts for type 2 update and 1,000 insert of totally new records) and inserts 10,000 records in FACT.
But what will happen when we start to process much more data and the dimension starts to grow very significantly?
Let’s say data has grown by 300 times and we process 3 millions records daily, and now the dimension has 30 million records. These are practically real numbers from when we faced a real performance problem in our company and had to do something about it.
New setup script for bigger data set would look like this:
TRUNCATE TABLE STAGE; TRUNCATE TABLE DIM_SCD2; ALTER TABLE DIM_SCD2 DROP PRIMARY KEY DROP INDEX; insert into dim_scd2 with generator as ( select rownum id from all_objects where rownum <= 10000 ) select rownum+1000 dim_key, to_char(rownum)||rpad('x',80,'x') natural, to_char(rownum+100) details, sysdate - 1000 begin_date, sysdate + 1000 end_date from generator g1, generator g2 where rownum <= 30000000 ; CREATE UNIQUE INDEX DIM_PK ON DIM_SCD2(DIM_KEY) TABLESPACE WCA_APP_IDX STORAGE (INITIAL 1M NEXT 1M) ; ALTER TABLE DIM_SCD2 ADD ( PRIMARY KEY (DIM_KEY) USING INDEX DIM_PK) ; insert into stage with generator as ( select rownum id from all_objects where rownum <= 10000 ) -- existing DIM recotds select natural, decode(mod(rownum,2),0, details, details||'*') from dim_scd2 where rownum <= 2700000 union all -- new DIM recotds select to_char(rownum+30000000)||rpad('x',80,'x') natural, to_char(rownum+30000100) details from generator g1, generator g2 where rownum <= 300000; commit;
And we run the workflow again.
Now it runs almost 28 minutes. It updates 1,350,000 records in DIM_SCD2, inserts 1,650.000 records in DIM_SCD2 (1,350,000 inserts for type 2 update and 300,000 insert of totally new records) and inserts 3,000,000 records in FACT.
From Informatica session logs we can see that just the creation of first session lookup files (data file is almost 7GB and index file is more than 7GB) took about 9 minutes. For second session lookup files, creation took almost 5 minutes (data file is less then 1GB and index file is more than 7GB). So, we spend a lot of time just building look up structures.
Creation on index for NATURAL column doesn’t help much because Informatica creates its own lookups and indexes it on its own server.
Probably for some people 24 minutes does not look like a very long time, but if we assume that data will continue to grow and we need to run ETL several times a day then this is a problem.
Improving Dimension refresh
The first thing that we can do is to improve the dimension refresh itself. Would it run faster if we add more memory, CPUs, faster disks on ETL server? Not much. Like if during Battle of Thermopylae Persians could add more troops to its already giant army would they be more successful against Spartans? Probably not, because they could still only use the limited small number of soldiers at the same time. (You can read more about historical military analogies for different ETL situation at https://collectivegenius.wordpress.com/2012/04/24/winning-etl-battles/.)
What if we change the tactic completely and will not try to percolate through the narrow Thermopylae pass (ETL row-by-row processing) but will fight on the open field as Macedonian phalanx (SQL data set processing)?
We will refresh our dimension in three bulk steps. First, we insert new versions of changed records in the new stage table STAGE1. Then we MERGE into dimension doing update of changed records that become old versions of these records and inserting brand new records. Last step is to insert everything from STAGE1 into dimension. The reason for having an extra staging table is to make MERGE logic more straightforward and efficient.
We would need a new database sequence to use it instead of Informatica sequence.
CREATE TABLE STAGE1 ( NATURAL VARCHAR2(100) NOT NULL, DETAILS VARCHAR2(100) NOT NULL) TABLESPACE WCA_APP_DATA STORAGE (INITIAL 1M NEXT 1M) ; truncate table stage1; DECLARE max_key VARCHAR2(10); BEGIN select to_char(max(dim_key)) into max_key from dim_scd2; EXECUTE IMMEDIATE 'CREATE SEQUENCE dim_scd2_seq INCREMENT BY 1 START WITH ' ||max_key; END; /
We put all three SQLs in the stored procedure DIM_SCD2_REFRESH and replace the complicated mapping m_dim_scd2 with a very simple mapping that just calls this stored procedure.
CREATE OR REPLACE PROCEDURE dim_scd2_refresh AS BEGIN -- 1 --Inserting changed records into another staging table -- that can be used later (3) to insert into DIM_SCD2 INSERT INTO STAGE1 (natural, details) SELECT s.natural, s.details FROM STAGE s, DIM_SCD2 t WHERE s.natural = t.natural AND s.details t.details ; -- 2 -- Updating changed records (type 2) -- and inserting new records. MERGE INTO dim_scd2 t USING (SELECT DISTINCT natural, details FROM STAGE) s ON ( s.natural = t.natural) WHEN MATCHED THEN UPDATE SET end_date = sysdate WHERE t.begin_date <= sysdate AND t.end_date IS NULL AND s.details t.details WHEN NOT MATCHED THEN INSERT (dim_key, natural, details, begin_date, end_date) VALUES (dim_scd2_seq.NEXTVAL, s.natural, s.details, to_date('01/01/1970','mm/dd/yyyy'), NULL) ; -- 3 -- Inserting changed records (type 2) into dimension. INSERT INTO dim_scd2 (dim_key, natural, details, begin_date, end_date) SELECT dim_scd2_seq.NEXTVAL, s.natural, s.details, sysdate, NULL FROM stage1 s; COMMIT; END; /
This change gives us more than three times improvement for dimension refresh session. And we can go even further to use parallelism and partitioning for STAGE table and probably DIM_SCD2 also.
Improving Fact refresh. Join instead of LookUp
Now let us see how can we improve our dimension lookup during the fact table load. From the Informatica session log we can see that a majority of all run time was spent just for building lookup files. And then we process the data row-by-row again looking for the keys in lookup files. Can we change this approach to data set processing as we did in the previous case? Yes we can. We can just join staging table with dimension and insert the results in the fact table. All actions are in one bulk operation. In real case scenario we would need to join with several dimension tables putting intermediate results into another staging tables. Couple staging tables is enough. We can reuse them in cyclic fashion truncating and reloading one after another. So, lets create the new session s_m_fact_join_dim_sp that will use the new mapping m_fact_join_dim_sp that will call the stored procedure fact_join_dim_refresh.
CREATE OR REPLACE PROCEDURE fact_join_dim_refresh AS BEGIN INSERT INTO FACT (dim_key, metric) SELECT NVL(d.dim_key,-1), 1 FROM STAGE s left outer join DIM_SCD2 d on s.natural = d.natural WHERE END_DATE IS NULL ; END; /
The Fact load time is reduced in two times.
Dimension Cache
We are discussing how to efficiently refresh a really big dimension. But if during each ETL run (daily, hourly, etc.) we touch only a small portion of dimension data why do we have to build a lookup for all giant dimension or join with all giant dimension? In our test case we touch only 10% of the data. In real case it can be much less.
What if we can create an ETL runtime cache of our dimension? Cache will include only records that were updated or inserted during dimension refresh. And we can use only these records for fact refresh lookup or join.
We will adjust our DIM_SCD2 refresh stored procedure to include CACHE_SCD2 refresh on the way.
First we replace STAGE1 with STAGE2 we where keep staging data and dimension data.
CREATE TABLE STAGE2 ( S_NATURAL VARCHAR2(100) NOT NULL, S_DETAILS VARCHAR2(100) NOT NULL, DIM_KEY NUMBER(10) , NATURAL VARCHAR2(100) , DETAILS VARCHAR2(100) , BEGIN_DATE DATE , END_DATE DATE ) TABLESPACE WCA_APP_DATA STORAGE (INITIAL 1M NEXT 1M) ;
CACHE_SCD2 table will look exactly like DIM_SCD2:
CREATE TABLE CACHE_SCD2 ( DIM_KEY NUMBER(10) NOT NULL, NATURAL VARCHAR2(100) NOT NULL, DETAILS VARCHAR2(100) NOT NULL, BEGIN_DATE DATE NOT NULL, END_DATE DATE ) TABLESPACE WCA_APP_DATA STORAGE (INITIAL 1M NEXT 1M) ;
New stored procedure for DIM_SCD2 and CACHE_SCD2 refresh:
CREATE OR REPLACE PROCEDURE dim_scd2_cache_refresh AS dim_key_border NUMBER(10); BEGIN -- 1 -- Outer join STAGE and DIM_SCD2 and inserti into STAGE2 -- where dimension data is missing it is new records -- where dimension data is present we use dim_key to unsert in CACHE_DIM execute immediate 'TRUNCATE TABLE STAGE2'; INSERT INTO STAGE2 SELECT s.NATURAL, s.DETAILS, d.DIM_KEY, d.NATURAL, d.DETAILS, d.BEGIN_DATE, d.END_DATE FROM stage s, dim_scd2 d WHERE s.NATURAL = D.NATURAL(+); -- 2 -- Updating changed records (type 2) MERGE INTO dim_scd2 t USING (SELECT DISTINCT s_natural, s_details FROM STAGE2 WHERE dim_key IS NOT NULL) s ON ( s.s_natural = t.natural) WHEN MATCHED THEN UPDATE SET end_date = sysdate WHERE t.begin_date <= sysdate AND t.end_date IS NULL AND s.s_details t.details ; -- 3 -- Inserting unchanged records into cache. execute immediate 'TRUNCATE TABLE CACHE_SCD2'; INSERT INTO cache_scd2 (dim_key, natural, details, begin_date, end_date) SELECT dim_key, natural, details, begin_date, end_date FROM stage2 WHERE natural is not null AND s_details = details; -- 4 --Save the curent DIM_KEY sequence value SELECT dim_scd2_seq.NEXTVAL INTO dim_key_border FROM dual; -- 5 -- Inserting changed records into cache. INSERT INTO cache_scd2 (dim_key, natural, details, begin_date, end_date) SELECT dim_scd2_seq.NEXTVAL, natural, details, sysdate, NULL FROM stage2 WHERE natural is not null AND s_details details; -- 6 -- Inserting new records into cache. INSERT INTO cache_scd2 (dim_key, natural, details, begin_date, end_date) SELECT dim_scd2_seq.NEXTVAL, s_natural, s_details, to_date('01/01/1970','mm/dd/yyyy'), NULL FROM stage2 WHERE natural is null; -- 7 -- Insert changed and new records into DIM_SCD2. INSERT INTO dim_scd2 (dim_key, natural, details, begin_date, end_date) SELECT dim_key, natural, details, begin_date, end_date FROM cache_scd2 WHERE dim_key > dim_key_border ; COMMIT; END; /
We still keep bulk pace and run pretty fast.
Fact refresh. LookUp with Cache
Now we will test our ETL, calling the new stored procedure DIM_SCD2_CACHE_REFRESH in the first session and in the second session looking up CACHE_SCD2 instead of DIM_SCD2.
The Fact is loaded three times faster than with the original version.
Improving Fact refresh. Join with Cache
Last test would be calling the new stored procedure DIM_SCD2_CACHE_REFRESH in the first session and join with CACHE_SCD2 instead of DIM_SCD2 in the second session.
Second session will call next stored procedure:
CREATE OR REPLACE PROCEDURE SALENTYEV.fact_join_cache_refresh AS BEGIN INSERT INTO FACT (dim_key, metric) SELECT NVL(d.dim_key,-1), 1 FROM STAGE s left outer join CACHE_SCD2 d on s.natural = d.natural WHERE END_DATE IS NULL ; END; /
This version loads the Fact three times faster than the original version.
Comparison and Conclusion
Let’s compare all our results.
Release | Dimension | Fact | Total |
Basic ETL run | 27:03 | 5:46 | 32:51 |
Dimension ELT + Dimension Lookup during the fact load | 12:04 | 6:24 | 18:28 |
Dimension ELT + Dimension Join during the fact load | 13:19 | 3:26 | 16:47 |
Dimension ELT + Cache Lookup during the fact load | 13:37 | 2:11 | 15:50 |
Dimension ELT + Cache Join during the fact load | 12:48 | 1:02 | 13:53 |
Theoretically all “Dimension ELT” should show the same time but each run is slightly different even though I tried to run it during the silent time on an Oracle database and Informatica server.
For this testing I used a decent ETL server but a pretty modest database server. Nevertheless our workflow run faster in times. On real production deployment cases we had up to 5X improvement for dimension refresh and about 10X faster runs for fact refresh.
The main point in all our improvements is to run bulk SQL on a database server. It is faster but, more importantly, it is scaling better than the traditional ETL approach. Such a method can be used for any batch Data Warehouse Refresh. It doesn’t matter if you update once a day or once a minute. For streaming or message type of Data Warehouse updates other approaches should be used, but that is a subject for other discussions.