Oracle® Database Data Warehousing Guide 10g Release 1 (10.1) Part Number B10736-01 |
|
|
View PDF |
This chapter helps you create and manage a data warehouse, and discusses:
Data transformations are often the most complex and, in terms of processing time, the most costly part of the extraction, transformation, and loading (ETL) process. They can range from simple data conversions to extremely complex data scrubbing techniques. Many, if not all, data transformations can occur within an Oracle database, although transformations are often implemented outside of the database (for example, on flat files) as well.
This chapter introduces techniques for implementing scalable and efficient data transformations within the Oracle Database. The examples in this chapter are relatively simple. Real-world data transformations are often considerably more complex. However, the transformation techniques introduced in this chapter meet the majority of real-world data transformation requirements, often with more scalability and less programming than alternative approaches.
This chapter does not seek to illustrate all of the typical transformations that would be encountered in a data warehouse, but to demonstrate the types of fundamental technology that can be applied to implement these transformations and to provide guidance in how to choose the best techniques.
From an architectural perspective, you can transform your data in two ways:
The data transformation logic for most data warehouses consists of multiple steps. For example, in transforming new records to be inserted into a sales table, there may be separate logical transformation steps to validate each dimension key.
Figure 14-1 offers a graphical way of looking at the transformation logic.
When using Oracle Database as a transformation engine, a common strategy is to implement each transformation as a separate SQL operation and to create a separate, temporary staging table (such as the tables new_sales_step1
and new_sales_step2
in Figure 14-1) to store the incremental results for each step. This load-then-transform strategy also provides a natural checkpointing scheme to the entire transformation process, which enables to the process to be more easily monitored and restarted. However, a disadvantage to multistaging is that the space and time requirements increase.
It may also be possible to combine many simple logical transformations into a single SQL statement or single PL/SQL procedure. Doing so may provide better performance than performing each step independently, but it may also introduce difficulties in modifying, adding, or dropping individual transformations, as well as recovering from failed transformations.
The ETL process flow can be changed dramatically and the database becomes an integral part of the ETL solution.
The new functionality renders some of the former necessary process steps obsolete while some others can be remodeled to enhance the data flow and the data transformation to become more scalable and non-interruptive. The task shifts from serial transform-then-load process (with most of the tasks done outside the database) or load-then-transform process, to an enhanced transform-while-loading.
Oracle offers a wide variety of new capabilities to address all the issues and tasks relevant in an ETL scenario. It is important to understand that the database offers toolkit functionality rather than trying to address a one-size-fits-all solution. The underlying database has to enable the most appropriate ETL process flow for a specific customer need, and not dictate or constrain it from a technical perspective. Figure 14-2 illustrates the new functionality, which is discussed throughout later sections.
You can use the following mechanisms for loading a data warehouse:
Before any data transformations can occur within the database, the raw data must become accessible for the database. One approach is to load it into the database. Chapter 13, " Transportation in Data Warehouses", discusses several techniques for transporting data to an Oracle data warehouse. Perhaps the most common technique for transporting data is by way of flat files.
SQL*Loader is used to move data from flat files into an Oracle data warehouse. During this data load, SQL*Loader can also be used to implement basic data transformations. When using direct-path SQL*Loader, basic data manipulation, such as datatype conversion and simple NULL
handling, can be automatically resolved during the data load. Most data warehouses use direct-path loading for performance reasons.
The conventional-path loader provides broader capabilities for data transformation than a direct-path loader: SQL functions can be applied to any column as those values are being loaded. This provides a rich capability for transformations during the data load. However, the conventional-path loader is slower than direct-path loader. For these reasons, the conventional-path loader should be considered primarily for loading and transforming smaller amounts of data.
The following is a simple example of a SQL*Loader controlfile to load data into the sales
table of the sh
sample schema from an external file sh_sales.dat
. The external flat file sh_sales.dat
consists of sales transaction data, aggregated on a daily level. Not all columns of this external file are loaded into sales
. This external file will also be used as source for loading the second fact table of the sh
sample schema, which is done using an external table:
The following shows the controlfile (sh_sales.ctl
) to load the sales
table:
LOAD DATA INFILE sh_sales.dat APPEND INTO TABLE sales FIELDS TERMINATED BY "|" (PROD_ID, CUST_ID, TIME_ID, CHANNEL_ID, PROMO_ID, QUANTITY_SOLD, AMOUNT_SOLD)
It can be loaded with the following command:
$ sqlldr sh/sh control=sh_sales.ctl direct=true
Another approach for handling external data sources is using external tables. Oracle's external table feature enables you to use external data as a virtual table that can be queried and joined directly and in parallel without requiring the external data to be first loaded in the database. You can then use SQL, PL/SQL, and Java to access the external data.
External tables enable the pipelining of the loading phase with the transformation phase. The transformation process can be merged with the loading process without any interruption of the data streaming. It is no longer necessary to stage the data inside the database for further processing inside the database, such as comparison or transformation. For example, the conversion functionality of a conventional load can be used for a direct-path INSERT
AS
SELECT
statement in conjunction with the SELECT
from an external table.
The main difference between external tables and regular tables is that externally organized tables are read-only. No DML operations (UPDATE
/INSERT
/DELETE
) are possible and no indexes can be created on them.
External tables are a mostly compliant to the existing SQL*Loader functionality and provide superior functionality in most cases. External tables are especially useful for environments where the complete external source has to be joined with existing database objects or when the data has to be transformed in a complex manner. For example, unlike SQL*Loader, you can apply any arbitrary SQL transformation and use the direct path insert method.
You can create an external table named sales_transactions_ext
, representing the structure of the complete sales transaction data, represented in the external file sh_sales.dat
. The product department is especially interested in a cost analysis on product and time. We thus create a fact table named cost
in the sales
history
schema. The operational source data is the same as for the sales
fact table. However, because we are not investigating every dimensional information that is provided, the data in the cost fact table has a coarser granularity than in the sales fact table, for example, all different distribution channels are aggregated.
We cannot load the data into the cost fact table without applying the previously mentioned aggregation of the detailed information, due to the suppression of some of the dimensions.
The external table framework offers a solution to solve this. Unlike SQL*Loader, where you would have to load the data before applying the aggregation, you can combine the loading and transformation within a single SQL DML statement, as shown in the following. You do not have to stage the data temporarily before inserting into the target table.
The object directories must already exist, and point to the directory containing the sh_sales.dat
file as well as the directory containing the bad and log files.
CREATE TABLE sales_transactions_ext (PROD_ID NUMBER, CUST_ID NUMBER, TIME_ID DATE, CHANNEL_ID NUMBER, PROMO_ID NUMBER, QUANTITY_SOLD NUMBER, AMOUNT_SOLD NUMBER(10,2), UNIT_COST NUMBER(10,2), UNIT_PRICE NUMBER(10,2)) ORGANIZATION external (TYPE oracle_loader DEFAULT DIRECTORY data_file_dir ACCESS PARAMETERS (RECORDS DELIMITED BY NEWLINE CHARACTERSET US7ASCII BADFILE log_file_dir:'sh_sales.bad_xt' LOGFILE log_file_dir:'sh_sales.log_xt' FIELDS TERMINATED BY "|" LDRTRIM ( PROD_ID, CUST_ID, TIME_ID DATE(10) "YYYY-MM-DD", CHANNEL_ID, PROMO_ID, QUANTITY_SOLD, AMOUNT_SOLD, UNIT_COST, UNIT_PRICE)) location ('sh_sales.dat') )REJECT LIMIT UNLIMITED;
The external table can now be used from within the database, accessing some columns of the external data only, grouping the data, and inserting it into the costs
fact table:
INSERT /*+ APPEND */ INTO COSTS (TIME_ID, PROD_ID, UNIT_COST, UNIT_PRICE) SELECT TIME_ID, PROD_ID, AVG(UNIT_COST), AVG(amount_sold/quantity_sold) FROM sales_transactions_ext GROUP BY time_id, prod_id;
See Also: Oracle Database SQL Reference for a complete description of external table syntax and restrictions and Oracle Database Utilities for usage examples |
OCI and direct-path APIs are frequently used when the transformation and computation are done outside the database and there is no need for flat file staging.
Export and import are used when the data is inserted as is into the target system. No complex extractions are possible. See Chapter 12, " Extraction in Data Warehouses" for further information.
You have the following choices for transforming data inside the database:
Once data is loaded into the database, data transformations can be executed using SQL operations. There are four basic techniques for implementing SQL data transformations:
The CREATE
TABLE
... AS
SELECT
statement (CTAS) is a powerful tool for manipulating large sets of data. As shown in the following example, many data transformations can be expressed in standard SQL, and CTAS provides a mechanism for efficiently executing a SQL query and storing the results of that query in a new database table. The INSERT
/*+APPEND
*/ ... AS
SELECT
statement offers the same capabilities with existing database tables.
In a data warehouse environment, CTAS is typically run in parallel using NOLOGGING
mode for best performance.
A simple and common type of data transformation is data substitution. In a data substitution transformation, some or all of the values of a single column are modified. For example, our sales
table has a channel_id
column. This column indicates whether a given sales transaction was made by a company's own sales force (a direct sale) or by a distributor (an indirect sale).
You may receive data from multiple source systems for your data warehouse. Suppose that one of those source systems processes only direct sales, and thus the source system does not know indirect sales channels. When the data warehouse initially receives sales data from this system, all sales records have a NULL
value for the sales.channel_id
field. These NULL
values must be set to the proper key value. For example, you can do this efficiently using a SQL function as part of the insertion into the target sales table statement:
The structure of source table sales_activity_direct
is as follows:
DESC sales_activity_direct Name Null? Type ------------ ----- ---------------- SALES_DATE DATE PRODUCT_ID NUMBER CUSTOMER_ID NUMBER PROMOTION_ID NUMBER AMOUNT NUMBER QUANTITY NUMBER INSERT /*+ APPEND NOLOGGING PARALLEL */ INTO sales SELECT product_id, customer_id, TRUNC(sales_date), 3, promotion_id, quantity, amount FROM sales_activity_direct;
Another technique for implementing a data substitution is to use an UPDATE
statement to modify the sales.channel_id
column. An UPDATE
will provide the correct result. However, if the data substitution transformations require that a very large percentage of the rows (or all of the rows) be modified, then, it may be more efficient to use a CTAS statement than an UPDATE
.
Oracle Database's merge functionality extends SQL, by introducing the SQL keyword MERGE
, in order to provide the ability to update or insert a row conditionally into a table or out of line single table views. Conditions are specified in the ON
clause. This is, besides pure bulk loading, one of the most common operations in data warehouse synchronization.
The following discusses various implementations of a merge. The examples assume that new data for the dimension table products is propagated to the data warehouse and has to be either inserted or updated. The table products_delta
has the same structure as products
.
Example 14-1 Merge Operation Using SQL
MERGE INTO products t USING products_delta s ON (t.prod_id=s.prod_id) WHEN MATCHED THEN UPDATE SET t.prod_list_price=s.prod_list_price, t.prod_min_price=s.prod_min_price WHEN NOT MATCHED THEN INSERT (prod_id, prod_name, prod_desc, prod_subcategory, prod_subcategory_desc, prod_category, prod_category_desc, prod_status, prod_list_price, prod_min_price) VALUES (s.prod_id, s.prod_name, s.prod_desc, s.prod_subcategory, s.prod_subcategory_desc, s.prod_category, s.prod_category_desc, s.prod_status, s.prod_list_price, s.prod_min_price);
Many times, external data sources have to be segregated based on logical attributes for insertion into different target objects. It's also frequent in data warehouse environments to fan out the same source data into several target objects. Multitable inserts provide a new SQL statement for these kinds of transformations, where data can either end up in several or exactly one target, depending on the business transformation rules. This insertion can be done conditionally based on business rules or unconditionally.
It offers the benefits of the INSERT
... SELECT
statement when multiple tables are involved as targets. In doing so, it avoids the drawbacks of the two obvious alternatives. You either had to deal with n independent INSERT
… SELECT
statements, thus processing the same source data n times and increasing the transformation workload n times. Alternatively, you had to choose a procedural approach with a per-row determination how to handle the insertion. This solution lacked direct access to high-speed access paths available in SQL.
As with the existing INSERT
... SELECT
statement, the new statement can be parallelized and used with the direct-load mechanism for faster performance.
Example 14-2 Unconditional Insert
The following statement aggregates the transactional sales information, stored in sales_activity_direct
, on a per daily base and inserts into both the sales
and the costs
fact table for the current day.
INSERT ALL INTO sales VALUES (product_id, customer_id, today, 3, promotion_id, quantity_per_day, amount_per_day) INTO costs VALUES (product_id, today, promotion_id, 3, product_cost, product_price) SELECT TRUNC(s.sales_date) AS today, s.product_id, s.customer_id, s.promotion_id, SUM(s.amount) AS amount_per_day, SUM(s.quantity) quantity_per_day, p.prod_min_price*0.8 AS product_cost, p.prod_list_price AS product_price FROM sales_activity_direct s, products p WHERE s.product_id = p.prod_id AND TRUNC(sales_date) = TRUNC(SYSDATE) GROUP BY TRUNC(sales_date), s.product_id, s.customer_id, s.promotion_id, p.prod_min_price*0.8, p.prod_list_price;
Example 14-3 Conditional ALL Insert
The following statement inserts a row into the sales
and costs
tables for all sales transactions with a valid promotion and stores the information about multiple identical orders of a customer in a separate table cum_sales_activity
. It is possible two rows will be inserted for some sales transactions, and none for others.
INSERT ALL WHEN promotion_id IN (SELECT promo_id FROM promotions) THEN INTO sales VALUES (product_id, customer_id, today, 3, promotion_id, quantity_per_day, amount_per_day) INTO costs VALUES (product_id, today, promotion_id, 3, product_cost, product_price) WHEN num_of_orders > 1 THEN INTO cum_sales_activity VALUES (today, product_id, customer_id, promotion_id, quantity_per_day, amount_per_day, num_of_orders) SELECT TRUNC(s.sales_date) AS today, s.product_id, s.customer_id, s.promotion_id, SUM(s.amount) AS amount_per_day, SUM(s.quantity) quantity_per_day, COUNT(*) num_of_orders, p.prod_min_price*0.8 AS product_cost, p.prod_list_price as product_price FROM sales_activity_direct s, products p WHERE s.product_id = p.prod_id AND TRUNC(sales_date) = TRUNC(SYSDATE) GROUP BY TRUNC(sales_date), s.product_id, s.customer_id, s.promotion_id, p.prod_min_price*0.8, p.prod_list_price;
Example 14-4 Conditional FIRST Insert
The following statement inserts into an appropriate shipping manifest according to the total quantity and the weight of a product order. An exception is made for high value orders, which are also sent by express, unless their weight classification is not too high. It assumes the existence of appropriate tables large_freight_shipping
, express_shipping
, and default_shipping
.
INSERT FIRST WHEN (sum_quantity_sold > 10 AND prod_weight_class < 5) OR (sum_quantity_sold > 5 AND prod_weight_class > 5) THEN INTO large_freight_shipping VALUES (time_id, cust_id, prod_id, prod_weight_class, sum_quantity_sold) WHEN sum_amount_sold > 1000 THEN INTO express_shipping VALUES (time_id, cust_id, prod_id, prod_weight_class, sum_amount_sold, sum_quantity_sold) ELSE INTO default_shipping VALUES (time_id, cust_id, prod_id, sum_quantity_sold) SELECT s.time_id, s.cust_id, s.prod_id, p.prod_weight_class, SUM(amount_sold) AS sum_amount_sold, SUM(quantity_sold) AS sum_quantity_sold FROM sales s, products p WHERE s.prod_id = p.prod_id AND s.time_id = TRUNC(SYSDATE) GROUP BY s.time_id, s.cust_id, s.prod_id, p.prod_weight_class;
Example 14-5 Mixed Conditional and Unconditional Insert
The following example inserts new customers into the customers
table and stores all new customers with cust_credit_limit
higher then 4500 in an additional, separate table for further promotions.
INSERT FIRST WHEN cust_credit_limit >= 4500 THEN INTO customers INTO customers_special VALUES (cust_id, cust_credit_limit) ELSE INTO customers SELECT * FROM customers_new;
See Chapter 15, " Maintaining the Data Warehouse" for more information regarding MERGE
operations.
In a data warehouse environment, you can use procedural languages such as PL/SQL to implement complex transformations in the Oracle Database. Whereas CTAS operates on entire tables and emphasizes parallelism, PL/SQL provides a row-based approached and can accommodate very sophisticated transformation rules. For example, a PL/SQL procedure could open multiple cursors and read data from multiple source tables, combine this data using complex business rules, and finally insert the transformed data into one or more target table. It would be difficult or impossible to express the same sequence of operations using standard SQL statements.
Using a procedural language, a specific transformation (or number of transformation steps) within a complex ETL processing can be encapsulated, reading data from an intermediate staging area and generating a new table object as output. A previously generated transformation input table and a subsequent transformation will consume the table generated by this specific transformation. Alternatively, these encapsulated transformation steps within the complete ETL process can be integrated seamlessly, thus streaming sets of rows between each other without the necessity of intermediate staging. You can use table functions to implement such behavior.
Table functions provide the support for pipelined and parallel execution of transformations implemented in PL/SQL, C, or Java. Scenarios as mentioned earlier can be done without requiring the use of intermediate staging tables, which interrupt the data flow through various transformations steps.
A table function is defined as a function that can produce a set of rows as output. Additionally, table functions can take a set of rows as input. Prior to Oracle9i, PL/SQL functions:
Could not take cursors as input.
Could not be parallelized or pipelined.
Now, functions are not limited in these ways. Table functions extend database functionality by allowing:
Multiple rows to be returned from a function.
Results of SQL subqueries (that select multiple rows) to be passed directly to functions.
Functions take cursors as input.
Functions can be parallelized.
Returning result sets incrementally for further processing as soon as they are created. This is called incremental pipelining
Table functions can be defined in PL/SQL using a native PL/SQL interface, or in Java or C using the Oracle Data Cartridge Interface (ODCI).
See Also: PL/SQL User's Guide and Reference for further information and Oracle Data Cartridge Developer's Guide |
Figure 14-3 illustrates a typical aggregation where you input a set of rows and output a set of rows, in that case, after performing a SUM
operation.
The pseudocode for this operation would be similar to:
INSERT INTO Out SELECT * FROM ("Table Function"(SELECT * FROM In));
The table function takes the result of the SELECT
on In
as input and delivers a set of records in a different format as output for a direct insertion into Out
.
Additionally, a table function can fan out data within the scope of an atomic transaction. This can be used for many occasions like an efficient logging mechanism or a fan out for other independent transformations. In such a scenario, a single staging table will be needed.
Figure 14-4 Pipelined Parallel Transformation with Fanout
The pseudocode for this would be similar to:
INSERT INTO target SELECT * FROM (tf2(SELECT * FROM (tf1(SELECT * FROM source))));
This will insert into target
and, as part of tf1
, into Stage
Table
1
within the scope of an atomic transaction.
INSERT INTO target SELECT * FROM tf3(SELT * FROM stage_table1);
Example 14-6 Table Functions Fundamentals
The following examples demonstrate the fundamentals of table functions, without the usage of complex business rules implemented inside those functions. They are chosen for demonstration purposes only, and are all implemented in PL/SQL.
Table functions return sets of records and can take cursors as input. Besides the sh
sample schema, you have to set up the following database objects before using the examples:
CREATE TYPE product_t AS OBJECT ( prod_id NUMBER(6) , prod_name VARCHAR2(50) , prod_desc VARCHAR2(4000) , prod_subcategory VARCHAR2(50) , prod_subcategory_desc VARCHAR2(2000) , prod_category VARCHAR2(50) , prod_category_desc VARCHAR2(2000) , prod_weight_class NUMBER(2) , prod_unit_of_measure VARCHAR2(20) , prod_pack_size VARCHAR2(30) , supplier_id NUMBER(6) , prod_status VARCHAR2(20) , prod_list_price NUMBER(8,2) , prod_min_price NUMBER(8,2) ); / CREATE TYPE product_t_table AS TABLE OF product_t; / COMMIT; CREATE OR REPLACE PACKAGE cursor_PKG AS TYPE product_t_rec IS RECORD ( prod_id NUMBER(6) , prod_name VARCHAR2(50) , prod_desc VARCHAR2(4000) , prod_subcategory VARCHAR2(50) , prod_subcategory_desc VARCHAR2(2000) , prod_category VARCHAR2(50) , prod_category_desc VARCHAR2(2000) , prod_weight_class NUMBER(2) , prod_unit_of_measure VARCHAR2(20) , prod_pack_size VARCHAR2(30) , supplier_id NUMBER(6) , prod_status VARCHAR2(20) , prod_list_price NUMBER(8,2) , prod_min_price NUMBER(8,2)); TYPE product_t_rectab IS TABLE OF product_t_rec; TYPE strong_refcur_t IS REF CURSOR RETURN product_t_rec; TYPE refcur_t IS REF CURSOR; END; / REM artificial help table, used later CREATE TABLE obsolete_products_errors (prod_id NUMBER, msg VARCHAR2(2000));
The following example demonstrates a simple filtering; it shows all obsolete products except the prod_category
Electronics. The table function returns the result set as a set of records and uses a weakly typed ref cursor as input.
CREATE OR REPLACE FUNCTION obsolete_products(cur cursor_pkg.refcur_t) RETURN product_t_table IS prod_id NUMBER(6); prod_name VARCHAR2(50); prod_desc VARCHAR2(4000); prod_subcategory VARCHAR2(50); prod_subcategory_desc VARCHAR2(2000); prod_category VARCHAR2(50); prod_category_desc VARCHAR2(2000); prod_weight_class NUMBER(2); prod_unit_of_measure VARCHAR2(20); prod_pack_size VARCHAR2(30); supplier_id NUMBER(6); prod_status VARCHAR2(20); prod_list_price NUMBER(8,2); prod_min_price NUMBER(8,2); sales NUMBER:=0; objset product_t_table := product_t_table(); i NUMBER := 0; BEGIN LOOP -- Fetch from cursor variable FETCH cur INTO prod_id, prod_name, prod_desc, prod_subcategory, prod_subcategory_desc, prod_category, prod_category_desc, prod_weight_class, prod_unit_of_measure, prod_pack_size, supplier_id, prod_status, prod_list_price, prod_min_price; EXIT WHEN cur%NOTFOUND; -- exit when last row is fetched -- Category Electronics is not meant to be obsolete and will be suppressed IF prod_status='obsolete' AND prod_category != 'Electronics' THEN -- append to collection i:=i+1; objset.extend; objset(i):=product_t( prod_id, prod_name, prod_desc, prod_subcategory, prod_subcategory_desc, prod_category, prod_category_desc, prod_weight_class, prod_unit_of_measure, prod_pack_size, supplier_id, prod_status, prod_list_price, prod_min_price); END IF; END LOOP; CLOSE cur; RETURN objset; END; /
You can use the table function in a SQL statement to show the results. Here we use additional SQL functionality for the output:
SELECT DISTINCT UPPER(prod_category), prod_status FROM TABLE(obsolete_products( CURSOR(SELECT prod_id, prod_name, prod_desc, prod_subcategory, prod_subcategory_desc, prod_category, prod_category_desc, prod_weight_class, prod_unit_of_measure, prod_pack_size, supplier_id, prod_status, prod_list_price, prod_min_price FROM products)));
The following example implements the same filtering than the first one. The main differences between those two are:
This example uses a strong typed REF cursor as input and can be parallelized based on the objects of the strong typed cursor, as shown in one of the following examples.
The table function returns the result set incrementally as soon as records are created.
CREATE OR REPLACE FUNCTION obsolete_products_pipe(cur cursor_pkg.strong_refcur_t) RETURN product_t_table PIPELINED PARALLEL_ENABLE (PARTITION cur BY ANY) IS prod_id NUMBER(6); prod_name VARCHAR2(50); prod_desc VARCHAR2(4000); prod_subcategory VARCHAR2(50); prod_subcategory_desc VARCHAR2(2000); prod_category VARCHAR2(50); prod_category_desc VARCHAR2(2000); prod_weight_class NUMBER(2); prod_unit_of_measure VARCHAR2(20); prod_pack_size VARCHAR2(30); supplier_id NUMBER(6); prod_status VARCHAR2(20); prod_list_price NUMBER(8,2); prod_min_price NUMBER(8,2); sales NUMBER:=0; BEGIN LOOP -- Fetch from cursor variable FETCH cur INTO prod_id, prod_name, prod_desc, prod_subcategory, prod_subcategory_desc, prod_category, prod_category_desc, prod_weight_class, prod_unit_of_measure, prod_pack_size, supplier_id, prod_status, prod_list_price, prod_min_price; EXIT WHEN cur%NOTFOUND; -- exit when last row is fetched IF prod_status='obsolete' AND prod_category !='Electronics' THEN PIPE ROW (product_t( prod_id, prod_name, prod_desc, prod_subcategory, prod_subcategory_desc, prod_category, prod_category_desc, prod_weight_class, prod_unit_of_measure, prod_pack_size, supplier_id, prod_status, prod_list_price, prod_min_price)); END IF; END LOOP; CLOSE cur; RETURN; END; /
You can use the table function as follows:
SELECT DISTINCT prod_category, DECODE(prod_status,'obsolete','NO LONGER AVAILABLE','N/A') FROM TABLE(obsolete_products_pipe( CURSOR(SELECT prod_id, prod_name, prod_desc, prod_subcategory, prod_subcategory_desc, prod_category, prod_category_desc, prod_weight_class, prod_unit_of_measure, prod_pack_size, supplier_id, prod_status, prod_list_price, prod_min_price FROM products)));
We now change the degree of parallelism for the input table products and issue the same statement again:
ALTER TABLE products PARALLEL 4;
The session statistics show that the statement has been parallelized:
SELECT * FROM V$PQ_SESSTAT WHERE statistic='Queries Parallelized'; STATISTIC LAST_QUERY SESSION_TOTAL -------------------- ---------- ------------- Queries Parallelized 1 3 1 row selected.
Table functions are also capable to fanout results into persistent table structures. This is demonstrated in the next example. The function filters returns all obsolete products except a those of a specific prod_category
(default Electronics), which was set to status obsolete
by error. The result set of the table function consists of all other obsolete product categories. The detected wrong prod_id
's are stored in a separate table structure obsolete_products_error
. Note that if a table function is part of an autonomous transaction, it must COMMIT
or ROLLBACK
before each PIPE
ROW
statement to avoid an error in the calling subprogram. Its result set consists of all other obsolete product categories. It furthermore demonstrates how normal variables can be used in conjunction with table functions:
CREATE OR REPLACE FUNCTION obsolete_products_dml(cur cursor_pkg.strong_refcur_t, prod_cat varchar2 DEFAULT 'Electronics') RETURN product_t_table PIPELINED PARALLEL_ENABLE (PARTITION cur BY ANY) IS PRAGMA AUTONOMOUS_TRANSACTION; prod_id NUMBER(6); prod_name VARCHAR2(50); prod_desc VARCHAR2(4000); prod_subcategory VARCHAR2(50); prod_subcategory_desc VARCHAR2(2000); prod_category VARCHAR2(50); prod_category_desc VARCHAR2(2000); prod_weight_class NUMBER(2); prod_unit_of_measure VARCHAR2(20); prod_pack_size VARCHAR2(30); supplier_id NUMBER(6); prod_status VARCHAR2(20); prod_list_price NUMBER(8,2); prod_min_price NUMBER(8,2); sales NUMBER:=0; BEGIN LOOP -- Fetch from cursor variable FETCH cur INTO prod_id, prod_name, prod_desc, prod_subcategory, prod_subcategory_desc, prod_category, prod_category_desc, prod_weight_class, prod_unit_of_measure, prod_pack_size, supplier_id, prod_status, prod_list_price, prod_min_price; EXIT WHEN cur%NOTFOUND; -- exit when last row is fetched IF prod_status='obsolete' THEN IF prod_category=prod_cat THEN INSERT INTO obsolete_products_errors VALUES (prod_id, 'correction: category '||UPPER(prod_cat)||' still available'); COMMIT; ELSE PIPE ROW (product_t( prod_id, prod_name, prod_desc, prod_subcategory, prod_subcategory_desc, prod_category, prod_category_desc, prod_weight_class, prod_unit_of_measure, prod_pack_size, supplier_id, prod_status, prod_list_price, prod_min_price)); END IF; END IF; END LOOP; CLOSE cur; RETURN; END; /
The following query shows all obsolete product groups except the prod_category
Electronics, which was wrongly set to status obsolete
:
SELECT DISTINCT prod_category, prod_status FROM TABLE(obsolete_products_dml( CURSOR(SELECT prod_id, prod_name, prod_desc, prod_subcategory, prod_subcategory_desc, prod_category, prod_category_desc, prod_weight_class, prod_unit_of_measure, prod_pack_size, supplier_id, prod_status, prod_list_price, prod_min_price FROM products)));
As you can see, there are some products of the prod_category
Electronics
that were obsoleted by accident:
SELECT DISTINCT msg FROM obsolete_products_errors;
Taking advantage of the second input variable, you can specify a different product group than Electronics to be considered:
SELECT DISTINCT prod_category, prod_status FROM TABLE(obsolete_products_dml( CURSOR(SELECT prod_id, prod_name, prod_desc, prod_subcategory, prod_subcategory_desc, prod_category, prod_category_desc, prod_weight_class, prod_unit_of_measure, prod_pack_size, supplier_id, prod_status, prod_list_price, prod_min_price FROM products),'Photo'));
Because table functions can be used like a normal table, they can be nested, as shown in the following:
SELECT DISTINCT prod_category, prod_status FROM TABLE(obsolete_products_dml(CURSOR(SELECT * FROM TABLE(obsolete_products_pipe(CURSOR(SELECT prod_id, prod_name, prod_desc, prod_subcategory, prod_subcategory_desc, prod_category, prod_category_desc, prod_weight_class, prod_unit_of_measure, prod_pack_size, supplier_id, prod_status, prod_list_price, prod_min_price FROM products))))));
The biggest advantage of Oracle Database's ETL is its toolkit functionality, where you can combine any of the latter discussed functionality to improve and speed up your ETL processing. For example, you can take an external table as input, join it with an existing table and use it as input for a parallelized table function to process complex business logic. This table function can be used as input source for a MERGE
operation, thus streaming the new information for the data warehouse, provided in a flat file within one single statement through the complete ETL process.
See PL/SQL User's Guide and Reference for details about table functions and the PL/SQL programming. For details about table functions implemented in other languages, see Oracle Data Cartridge Developer's Guide.
The following sections offer examples of typical loading and transformation tasks:
A typical transformation is the key lookup. For example, suppose that sales transaction data has been loaded into a retail data warehouse. Although the data warehouse's sales
table contains a product_id
column, the sales transaction data extracted from the source system contains Uniform Price Codes (UPC) instead of product IDs. Therefore, it is necessary to transform the UPC codes into product IDs before the new sales transaction data can be inserted into the sales
table.
In order to execute this transformation, a lookup table must relate the product_id
values to the UPC codes. This table might be the product
dimension table, or perhaps another table in the data warehouse that has been created specifically to support this transformation. For this example, we assume that there is a table named product
, which has a product_id
and an upc_code
column.
This data substitution transformation can be implemented using the following CTAS statement:
CREATE TABLE temp_sales_step2 NOLOGGING PARALLEL AS SELECT sales_transaction_id, product.product_id sales_product_id, sales_customer_id, sales_time_id, sales_channel_id, sales_quantity_sold, sales_dollar_amount FROM temp_sales_step1, product WHERE temp_sales_step1.upc_code = product.upc_code;
This CTAS statement will convert each valid UPC code to a valid product_id
value. If the ETL process has guaranteed that each UPC code is valid, then this statement alone may be sufficient to implement the entire transformation.
In the preceding example, if you must also handle new sales data that does not have valid UPC codes, you can use an additional CTAS statement to identify the invalid rows:
CREATE TABLE temp_sales_step1_invalid NOLOGGING PARALLEL AS SELECT * FROM temp_sales_step1 WHERE temp_sales_step1.upc_code NOT IN (SELECT upc_code FROM product);
This invalid data is now stored in a separate table, temp_sales_step1_invalid
, and can be handled separately by the ETL process.
Another way to handle invalid data is to modify the original CTAS to use an outer join:
CREATE TABLE temp_sales_step2 NOLOGGING PARALLEL AS SELECT sales_transaction_id, product.product_id sales_product_id, sales_customer_id, sales_time_id, sales_channel_id, sales_quantity_sold, sales_dollar_amount FROM temp_sales_step1, product WHERE temp_sales_step1.upc_code = product.upc_code (+);
Using this outer join, the sales transactions that originally contained invalidated UPC codes will be assigned a product_id
of NULL
. These transactions can be handled later.
Additional approaches to handling invalid UPC codes exist. Some data warehouses may choose to insert null-valued product_id
values into their sales
table, while other data warehouses may not allow any new data from the entire batch to be inserted into the sales
table until all invalid UPC codes have been addressed. The correct approach is determined by the business requirements of the data warehouse. Regardless of the specific requirements, exception handling can be addressed by the same basic SQL techniques as transformations.
A data warehouse can receive data from many different sources. Some of these source systems may not be relational databases and may store data in very different formats from the data warehouse. For example, suppose that you receive a set of sales records from a nonrelational database having the form:
product_id, customer_id, weekly_start_date, sales_sun, sales_mon, sales_tue, sales_wed, sales_thu, sales_fri, sales_sat
The input table looks like the following:
SELECT * FROM sales_input_table; PRODUCT_ID CUSTOMER_ID WEEKLY_ST SALES_SUN SALES_MON SALES_TUE SALES_WED SALES_THU SALES_FRI SALES_SAT ---------- ----------- --------- ---------- ---------- ---------- -------------------- ---------- ---------- 111 222 01-OCT-00 100 200 300 400 500 600 700 222 333 08-OCT-00 200 300 400 500 600 700 800 333 444 15-OCT-00 300 400 500 600 700 800 900
In your data warehouse, you would want to store the records in a more typical relational form in a fact table sales
of the sh
sample schema:
prod_id, cust_id, time_id, amount_sold
Note: A number of constraints on thesales table have been disabled for purposes of this example, because the example ignores a number of table columns for the sake of brevity. |
Thus, you need to build a transformation such that each record in the input stream must be converted into seven records for the data warehouse's sales
table. This operation is commonly referred to as pivoting, and Oracle Database offers several ways to do this.
The result of the previous example will resemble the following:
SELECT prod_id, cust_id, time_id, amount_sold FROM sales; PROD_ID CUST_ID TIME_ID AMOUNT_SOLD ---------- ---------- --------- ----------- 111 222 01-OCT-00 100 111 222 02-OCT-00 200 111 222 03-OCT-00 300 111 222 04-OCT-00 400 111 222 05-OCT-00 500 111 222 06-OCT-00 600 111 222 07-OCT-00 700 222 333 08-OCT-00 200 222 333 09-OCT-00 300 222 333 10-OCT-00 400 222 333 11-OCT-00 500 222 333 12-OCT-00 600 222 333 13-OCT-00 700 222 333 14-OCT-00 800 333 444 15-OCT-00 300 333 444 16-OCT-00 400 333 444 17-OCT-00 500 333 444 18-OCT-00 600 333 444 19-OCT-00 700 333 444 20-OCT-00 800 333 444 21-OCT-00 900
Example 14-7 Pivoting
The following example uses the multitable insert syntax to insert into the demo table sh.sales
some data from an input table with a different structure. The multitable insert statement looks like the following:
INSERT ALL INTO sales (prod_id, cust_id, time_id, amount_sold) VALUES (product_id, customer_id, weekly_start_date, sales_sun) INTO sales (prod_id, cust_id, time_id, amount_sold) VALUES (product_id, customer_id, weekly_start_date+1, sales_mon) INTO sales (prod_id, cust_id, time_id, amount_sold) VALUES (product_id, customer_id, weekly_start_date+2, sales_tue) INTO sales (prod_id, cust_id, time_id, amount_sold) VALUES (product_id, customer_id, weekly_start_date+3, sales_wed) INTO sales (prod_id, cust_id, time_id, amount_sold) VALUES (product_id, customer_id, weekly_start_date+4, sales_thu) INTO sales (prod_id, cust_id, time_id, amount_sold) VALUES (product_id, customer_id, weekly_start_date+5, sales_fri) INTO sales (prod_id, cust_id, time_id, amount_sold) VALUES (product_id, customer_id, weekly_start_date+6, sales_sat) SELECT product_id, customer_id, weekly_start_date, sales_sun, sales_mon, sales_tue, sales_wed, sales_thu, sales_fri, sales_sat FROM sales_input_table;
This statement only scans the source table once and then inserts the appropriate data for each day.