Implementing Bulkload job to load fact table from datalake (Apache Hive),CSV source file.

Most data warehouses are built using dimensional modelling technique as it involves complex hierarchy of relationship. In order to satisfy the need of large data warehouses , database tables are divided into certain categories based upon level of data abstraction.

Following are the categories-

  1. Dimensions
  2. Facts

Dimensions-

These kind of catalog tables includes descriptive information having related attributes which will be accessed by distinct key value.

It acts as page of book rather than index as it contains detailed data related to context. Dimension tables are of definite scope, i.e Accounts dimension will have Saving or Current account. As it has limited scope it can be used to get reference from.

datalake-apache-hive

Facts-

Unlike dimension tables, fact table doesn't have descriptive information instead it holds reference from dimension table. These table act as index of a book rather than pages.Fact table may have data from more than one dimension table to satisfy complex relationship.Usually these table contains large amount of data due to aggregation from multiple sources.

So here our objective is to populate the fact table from two sources Apache Hive and CSV source file.This can be achieved by using various built in components provided by Pentaho Data Integration (PDI).

For small amount of data we can use these components directly without any alteration in implementation, But while dealing with huge data-set we need to modify the flow of simple Pentaho ETL using different utilities that are available in PDI.

To populate fact table from different source we are following below data flow diagram-

1

EXTRACTION-

First data is extracted from Apache Toad(Hive) OR CSV Source File.

MERGING-

Hold data which is present from last run in hold table is merged with stage table to reprocess it.Merging is done based upon unique key combinations

TRUNCATE HOLD TABLE -

After merging data from previous run into stage table, we are good to truncate the hold table as its data is present in stage table. So, Hold table will be ready for next run.

TRANSFORMATION AND LOADING-

Once data is populated into stage table, We are going to perform various lookups and data cleansing activities.

During this step data is transformed and loaded into both fact and hold tables.

LOAD HOLD TABLE:

Records satisfying the filter condition will me moved to fact table, whereas other data is placed into Hold table for processing during next run.

We are implementing incremental load job as it is widely used in data warehouses.Below is job which will bulkload into fact table

FactLoad_Job.kjb:-

2

During execution of any step from above job, if any error encountered it will immediately send email notification by using Mail component of Pentaho Kettle. After sending email, Job will be aborted.

  1. get_update_date.ktr:

    This transformation will get the last update date till which records are already processed as it is incremental load. So we will be able to get records which are unprocessed.
    It will save the date into variable update_date.

    3
  2. truncate stage table:

    This step truncates the stage table 'staging_table' to load the new data after the update_date.

    4
  3. delete previously created stage files:

    This step finds and deletes the previously created fact load files in the out folder path '${dynamic_path}' with a RegExp '*_stage .*\.txt'.

    5

    EXTRACTION-

    First data is extracted from Apache Toad(Hive) OR CSV Source File.

    MERGING-

    Hold data which is present from last run in hold table is merged with stage table to reprocess it.Merging is done based upon unique key combinations.

    TRUNCATE

    EXTRACTION-

    First data is extracted from Apache Toad(Hive) OR CSV Source File.

    MERGING-

    Hold data which is present from last run in hold table is merged with stage table to reprocess it.Merging is done based upon unique key combinations.

    TRUNCATE

    6
  4. create_combine_stg_load_file.ktr:

    This step pulls the data from Apache Hive ‘Hive_table’ source table.It creates the stage load files with '|' seperator in a chunks of 200000 records. File will be created at '${dynamic_path}/hive_stage' location. If parent folder is not present then it will automatically create the folder

    • Replace variables in script? -
      Check this checkbox when you want to replace parameter value from variable which you have set already.

      7
  5. get_stage_filenames:

    This step executes 'get_stage_filenames.ktr' transformation. This is common transformation to get all stage file names.This step finds previously created stage load files from the folder path '${dynamic_path}' with a RegExp '${file_name}_stage.*\.txt'. Parameter 'directory_path' and 'file_name' are declared in main Job.

    8
  6. for_each_stg_file.kjb:

    This step executes 'set_var_filename.ktr' transformation. This is common transformation to set filename. Set Variables sets the variable 'filename', its scope will be valid in the parent job. bulkload_into_stage_table.kjb performs the bulkLoad into 'staging_table' by reading each stage file which was created previously , one by one.

    9
  7. merge hold table into staged based on unique keys:

    In this step, merge the hold table 'hold_table' into staged table 'staging_table'.

  8. truncate hold table:

    This step will truncate "hold_table" table so that for next run it will be empty.

    10
  9. delete previously created facts files:

    This step finds and deletes the previously created fact load files in the out folder path '${dynamic_path}' with a RegExp '*_fact.*\.txt'.

    11
  10. create_fact_files.ktr

    This step gets the data from staging table 'staging_table' and join it with the wt_category_d to get data and surrogate keys.

    Filter rows filters the rows coming from previous step. If category_sk is -1 then records go into step 'Load hold table' otherwise go into step 'create_fact_load_files'.

    create_fact_load_files will creates the fact load files with '|' seperator in a chunks of 200000 records.File will be created at '${dynamic_path}/*_fact' location.

    If parent folder is not present then it will automatically create the folder.

    12
  11. get_fact_filenames.ktr:

    This step finds previously created fact load files from the folder path '${dynamic_path}' with a RegExp ' *_fact.*\.txt'.

    Parameter 'directory_path' and 'file_name' are declared in main Job.

    13
  12. for_each_fact_file.kjb:

    “set_var.ktr” step gets the rows from the previous result.
    ”Set Variables” step sets the variable 'filename', its scope will be valid in the parent job.
    “bulkload_combine_fact.kjb”This step performs the bulkLoad into 'wt_digital_refunds_f'

    • Replace Data: Check this checkbox to replace the data based on Unique Key combinations.

    14
  13. modify update_dt to latest date:

    This step will update update_dt into repository table from which date will be fetched during next incremental run. Here, source_last_send is repository table which keeps tracks the latest date. And table_name is name of stage table.

    15

    In order to load data from CSV source file, the only transformation needs to change is create_stage_files.ktr. Specify the filetype in content tab as CSV.

    16 17

Conclusion :

Database lookups and table output component works fine with few thousands records table.But it is having poor performance with huge datasets.

With the help of Pentaho data integration utility components and slight changes into data flow can get improved performance and maintainability while working with large data-set.

If it is necessary to process huge scale of data for you, then Apache Spark is a quick and common framework built for emphasizing over large portion of data. As Spark is quickly experiencing enterprise adoption, Aegis Soft Tech delivers Apache Spark Services to make the framework easy and flexible for all your data processing needs.

Read More: