Oracle9i Streams Release 2 (9.2) Part Number A96571-02 |
|
|
View PDF |
This chapter provides instructions for managing logical change records (LCRs) and Streams tags, as well as instructions for performing a full database export/import in a Streams environment.
This chapter contains these topics:
Each task described in this chapter should be completed by a Streams administrator that has been granted the appropriate privileges, unless specified otherwise.
This section describes managing logical change records (LCRs). Make sure you meet the following requirements when you create or modify an LCR:
command_type
attribute is consistent with the presence or absence of old column values and the presence or absence of new column values.ddl_text
is consistent with the base_table_name
, base_table_owner
, object_type
, object_owner
, object_name
, and command_type
attributes.Use the following LCR constructors to create LCRs:
SYS.LCR$_ROW_RECORD
constructor.SYS.LCR$_DDL_RECORD
constructor. Make sure the DDL text specified in the ddl_text
attribute of each DDL LCR conforms to Oracle SQL syntax.The following example creates a queue in an Oracle database and an apply process associated with the queue. Then, it creates a PL/SQL procedure that constructs a row LCR based on information passed to it and enqueues the row LCR into the queue:
strmadmin
user.
CONNECT strmadmin/strmadminpw BEGIN DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'strm04_queue_table', storage_clause => NULL, queue_name => 'strm04_queue'); END; /
apply_captured
parameter is set to false
when you create the apply process, because the apply process will be applying user-enqueued events, not events captured by a capture process. Also, make sure the apply_user
parameter is set to hr
, because changes will be applied in to the hr.regions
table, and the apply user must have privileges to make DML changes to this table.
BEGIN DBMS_APPLY_ADM.CREATE_APPLY( queue_name => 'strm04_queue', apply_name => 'strm04_apply', apply_captured => false, apply_user => 'hr'); END; /
hr.regions
table made at the dbs1.net
source database.
BEGIN DBMS_STREAMS_ADM.ADD_TABLE_RULES( table_name => 'hr.regions', streams_type => 'apply', streams_name => 'strm04_apply', queue_name => 'strm04_queue', include_dml => true, include_ddl => false, include_tagged_lcr => false, source_database => 'dbs1.net'); END; /
disable_on_error
parameter for the apply process to n
.
BEGIN DBMS_APPLY_ADM.SET_PARAMETER( apply_name => 'strm04_apply', parameter => 'disable_on_error', value => 'n'); END; /
EXEC DBMS_APPLY_ADM.START_APPLY('strm04_apply');
construct_row_lcr
that constructs a row LCR and then enqueues it into the queue created in Step 1.
CREATE OR REPLACE PROCEDURE construct_row_lcr( source_dbname VARCHAR2, cmd_type VARCHAR2, obj_owner VARCHAR2, obj_name VARCHAR2, old_vals SYS.LCR$_ROW_LIST, new_vals SYS.LCR$_ROW_LIST) AS eopt DBMS_AQ.ENQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; enq_msgid RAW(16); row_lcr SYS.LCR$_ROW_RECORD; BEGIN
mprop.SENDER_ID := SYS.AQ$_AGENT('strmadmin', NULL, NULL); -- Construct the LCR based on information passed to procedure row_lcr := SYS.LCR$_ROW_RECORD.CONSTRUCT( source_database_name => source_dbname, command_type => cmd_type, object_owner => obj_owner, object_name => obj_name, old_values => old_vals, new_values => new_vals); -- Enqueue the created row LCR DBMS_AQ.ENQUEUE( queue_name => 'strm04_queue', enqueue_options => eopt, message_properties => mprop, payload => SYS.AnyData.ConvertObject(row_lcr), msgid => enq_msgid); END construct_row_lcr; /
See Also:
Oracle9i Supplied PL/SQL Packages and Types Reference for more information about LCR constructors |
construct_row_lcr
procedure created in Step 2.
hr.regions
table.
CONNECT strmadmin/strmadminpw DECLARE newunit1 SYS.LCR$_ROW_UNIT; newunit2 SYS.LCR$_ROW_UNIT; newvals SYS.LCR$_ROW_LIST; BEGIN newunit1 := SYS.LCR$_ROW_UNIT( 'region_id', SYS.AnyData.ConvertNumber(5), DBMS_LCR.NOT_A_LOB, NULL, NULL); newunit2 := SYS.LCR$_ROW_UNIT( 'region_name', SYS.AnyData.ConvertVarchar2('Moon'), DBMS_LCR.NOT_A_LOB, NULL, NULL); newvals := SYS.LCR$_ROW_LIST(newunit1,newunit2); construct_row_lcr( source_dbname => 'dbs1.net', cmd_type => 'INSERT', obj_owner => 'hr', obj_name => 'regions', old_vals => NULL, new_vals => newvals); END; / COMMIT;
You can connect as the hr
user and query the hr.regions
table to view the applied row change:
CONNECT hr/hr SELECT * FROM hr.regions;
The row with a region_id
of 5
should have Moon
for the region_name
.
hr.regions
table.
CONNECT strmadmin/strmadminpw DECLARE oldunit1 SYS.LCR$_ROW_UNIT; oldunit2 SYS.LCR$_ROW_UNIT; oldvals SYS.LCR$_ROW_LIST; newunit1 SYS.LCR$_ROW_UNIT; newvals SYS.LCR$_ROW_LIST; BEGIN oldunit1 := SYS.LCR$_ROW_UNIT( 'region_id', SYS.AnyData.ConvertNumber(5), DBMS_LCR.NOT_A_LOB, NULL, NULL); oldunit2 := SYS.LCR$_ROW_UNIT( 'region_name', SYS.AnyData.ConvertVarchar2('Moon'), DBMS_LCR.NOT_A_LOB, NULL, NULL); oldvals := SYS.LCR$_ROW_LIST(oldunit1,oldunit2); newunit1 := SYS.LCR$_ROW_UNIT( 'region_name', SYS.AnyData.ConvertVarchar2('Mars'), DBMS_LCR.NOT_A_LOB, NULL, NULL); newvals := SYS.LCR$_ROW_LIST(newunit1); construct_row_lcr( source_dbname => 'dbs1.net', cmd_type => 'UPDATE', obj_owner => 'hr', obj_name => 'regions', old_vals => oldvals, new_vals => newvals); END; / COMMIT;
You can connect as the hr
user and query the hr.regions
table to view the applied row change:
CONNECT hr/hr SELECT * FROM hr.regions;
The row with a region_id
of 5
should have Mars
for the region_name
.
hr.regions
table.
CONNECT strmadmin/strmadminpw DECLARE oldunit1 SYS.LCR$_ROW_UNIT; oldunit2 SYS.LCR$_ROW_UNIT; oldvals SYS.LCR$_ROW_LIST; BEGIN oldunit1 := SYS.LCR$_ROW_UNIT( 'region_id', SYS.AnyData.ConvertNumber(5), DBMS_LCR.NOT_A_LOB, NULL, NULL); oldunit2 := SYS.LCR$_ROW_UNIT( 'region_name', SYS.AnyData.ConvertVarchar2('Mars'), DBMS_LCR.NOT_A_LOB, NULL, NULL); oldvals := SYS.LCR$_ROW_LIST(oldunit1,oldunit2); construct_row_lcr( source_dbname => 'dbs1.net', cmd_type => 'DELETE', obj_owner => 'hr', obj_name => 'regions', old_vals => oldvals, new_vals => NULL); END; / COMMIT;
You can connect as the hr
user and query the hr.regions
table to view the applied row change:
CONNECT hr/hr SELECT * FROM hr.regions;
The row with a region_id
of 5
should have been deleted.
Release 9.2.0.2 introduces a new parameter, use_old
, in the following member functions for the SYS.LCR$_ROW_RECORD
type:
Currently, the use_old
parameter is not documented in the Oracle9i Supplied PL/SQL Packages and Types Reference. The following sections replace the sections for these member functions in the Oracle9i Supplied PL/SQL Packages and Types Reference.
Gets the LOB information for the column.
The return value can be one of the following:
DBMS_LCR.NOT_A_LOB CONSTANT NUMBER := 1; DBMS_LCR.NULL_LOB CONSTANT NUMBER := 2; DBMS_LCR.INLINE_LOB CONSTANT NUMBER := 3; DBMS_LCR.EMPTY_LOB CONSTANT NUMBER := 4; DBMS_LCR.LOB_CHUNK CONSTANT NUMBER := 5; DBMS_LCR.LAST_LOB_CHUNK CONSTANT NUMBER := 6;
Returns NULL
if the specified column does not exist.
If the command type of the row LCR is UPDATE
, then specifying 'Y'
for the use_old
parameter is a convenient way to get the value of the columns.
MEMBER FUNCTION GET_LOB_INFORMATION( value_type IN VARCHAR2, column_name IN VARCHAR2, use_old IN VARCHAR2 DEFAULT 'Y') RETURN NUMBER;
Returns the old or new value for the specified column, depending on the value type specified.
If the command type of the row LCR is UPDATE
, then specifying 'Y'
for the use_old
parameter is a convenient way to get the value of a column.
MEMBER FUNCTION GET_VALUE( value_type IN VARCHAR2, column_name IN VARCHAR2, use_old IN VARCHAR2 DEFAULT 'Y') RETURN SYS.AnyData;
Returns a list of old or new values, depending on the value type specified.
If the command type of the row LCR is UPDATE
, then specifying 'Y'
for the use_old
parameter is a convenient way to get the values of all columns.
MEMBER FUNCTION GET_VALUES( value_type IN VARCHAR2, use_old IN VARCHAR2 DEFAULT 'Y') RETURN SYS.LCR$_ROW_LIST;
The following are general considerations for row changes involving LOBs in a Streams environment:
DBMS_LOB.WRITE
procedure.The following sections contain information about the requirements you must meet when constructing or processing LOBs and about apply process behavior for LCRs containing LOBs. This section also includes an example that constructs and enqueues LCRs containing LOBs.
If your environment uses LCRs that contain LOB columns, then you must meet the following requirements when you construct these LCRs or process them with an apply handler or a rule-based transformation:
VARCHAR2
or RAW
. A VARCHAR2
is interpreted as a CLOB
, and a RAW
is interpreted as a BLOB
.LOB
WRITE
, LOB
ERASE
, and LOB
TRIM
are the only valid command types for out-of-line LOBs.LOB
WRITE
, LOB
ERASE
, and LOB
TRIM
LCRs, the old_values
collection should be empty or NULL
and new_values
should not be empty.lob_offset
should be a valid value for LOB
WRITE
and LOB
ERASE
LCRs. For all other command types, lob_offset
should be NULL
, under the assumption that LOB chunks for that column will follow.lob_operation_size
should be a valid value for LOB
ERASE
and LOB
TRIM
LCRs. For all other command types, lob_operation_size
should be NULL
.LOB
TRIM
and LOB
ERASE
are valid command types only for an LCR containing a LOB column with lob_information
set to LAST_LOB_CHUNK
.LOB
WRITE
is a valid command type only for an LCR containing a LOB column with lob_information
set to LAST_LOB_CHUNK
or LOB_CHUNK
.lob_information
set to NULL_LOB
, the data portion of the column should be a NULL
of VARCHAR2
type (for a CLOB
) or a NULL
of RAW
type (for a BLOB
). Otherwise, it is interpreted as a non-NULL
inline LOB column.LOB
WRITE
, LOB
ERASE
, and LOB
TRIM
LCR.LOB
ERASE
and a LOB
TRIM
LCR should be a NULL
value encapsulated in a SYS.AnyData
.All validation of these requirements is done by an apply process. If these requirements are not met, then an LCR containing a LOB column cannot be applied by an apply process nor processed by an apply handler. In this case, the LCR is moved to an exception queue with the rest of the LCRs in the same transaction.
See Also:
|
An apply process behaves in the following way when it encounters an LCR that contains a LOB:
INSERT
or UPDATE
has a new LOB that contains data and the lob_information
is not DBMS_LCR.LOB_CHUNK
or DBMS_LCR.LAST_LOB_CHUNK
, then the data is applied.INSERT
or UPDATE
has a new LOB that contains no data, and the lob_information
is DBMS_LCR.EMPTY_LOB
, then it is applied as an empty LOB.INSERT
or UPDATE
has a new LOB that contains no data, and the lob_information
is DBMS_LCR.NULL_LOB
or DBMS_LCR.INLINE_LOB
, then it is applied as a NULL
.INSERT
or UPDATE
has a new LOB and the lob_information
is DBMS_LCR.LOB_CHUNK
or DBMS_LCR.LAST_LOB_CHUNK
, then any LOB value is ignored. If the command type is INSERT
, then an empty LOB is inserted into the column under the assumption that LOB chunks will follow. If the command type is UPDATE
, then the column value is ignored under the assumption that LOB chunks will follow.UPDATE
are LOBs whose lob_information
is DBMS_LCR.LOB_CHUNK
or DBMS_LCR.LAST_LOB_CHUNK
, then the update is skipped under the assumption that LOB chunks will follow.UPDATE
or DELETE
, old LOB values are ignored.The following example illustrates creating a PL/SQL procedure for constructing and enqueuing LCRs containing LOBs. This example assumes that you have prepared your database for Streams by completing the necessary actions in Chapter 11, "Configuring a Streams Environment". Make sure the Streams administrator who runs this script has EXECUTE
privilege on the DBMS_AQ
and DBMS_APPLY_ADM
packages.
/************************* BEGINNING OF SCRIPT ******************************
Run SET
ECHO
ON
and specify the spool file for the script. Check the spool file for errors after you run this script.
*/ SET ECHO ON SPOOL lob_construct.out /*
*/ SET ECHO ON SET FEEDBACK 1 SET NUMWIDTH 10 SET LINESIZE 80 SET TRIMSPOOL ON SET TAB OFF SET PAGESIZE 100 SET SERVEROUTPUT ON SIZE 100000 CONNECT strmadmin/strmadminpw /*
*/ BEGIN DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'lobex_queue_table', queue_name => 'lobex_queue'); END; / /*
*/ BEGIN DBMS_APPLY_ADM.CREATE_APPLY( queue_name => 'strmadmin.lobex_queue', apply_name => 'apply_lob', apply_captured => false); END; / BEGIN DBMS_APPLY_ADM.SET_PARAMETER( apply_name => 'apply_lob', parameter => 'disable_on_error', value => 'n'); END; / BEGIN DBMS_APPLY_ADM.START_APPLY( 'apply_lob'); END; / /*
*/ CONNECT sys/change_on_install AS SYSDBA CREATE USER lob_user IDENTIFIED BY Lob_user_pw; GRANT CONNECT,RESOURCE TO lob_user; CONNECT lob_user/lob_user_pw CREATE TABLE with_clob (a NUMBER PRIMARY KEY, c1 CLOB, c2 CLOB, c3 CLOB); CREATE TABLE with_blob (a NUMBER PRIMARY KEY, b BLOB); /*
Granting these privileges enables the Streams administrator to get the LOB length for offset and to perform DML operations on the tables.
*/ GRANT ALL ON with_clob TO strmadmin; GRANT ALL ON with_blob TO strmadmin; COMMIT; /*
*/ CONNECT strmadmin/strmadminpw CREATE OR REPLACE PROCEDURE enq_row_lcr(source_dbname VARCHAR2, cmd_type VARCHAR2, obj_owner VARCHAR2, obj_name VARCHAR2, old_vals SYS.LCR$_ROW_LIST, new_vals SYS.LCR$_ROW_LIST) AS eopt DBMS_AQ.ENQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; enq_msgid RAW(16); xr_lcr SYS.LCR$_ROW_RECORD; BEGIN mprop.SENDER_ID := SYS.AQ$_AGENT('strmadmin', NULL, NULL); xr_lcr := SYS.LCR$_ROW_RECORD.CONSTRUCT( source_database_name => source_dbname, command_type => cmd_type, object_owner => obj_owner, object_name => obj_name, old_values => old_vals, new_values => new_vals); -- Enqueue a row lcr DBMS_AQ.ENQUEUE( queue_name => 'lobex_queue', enqueue_options => eopt, message_properties => mprop, payload => SYS.AnyData.ConvertObject(xr_lcr), msgid => enq_msgid); END enq_row_lcr; / SHOW ERRORS /*
*/ -- Description of each variable: -- src_dbname : Source database name -- tab_owner : Table owner -- tab_name : Table name -- col_name : Name of the CLOB column -- new_vals : SYS.LCR$_ROW_LIST containing primary key and supplementally -- logged colums -- clob_data : CLOB that contains data to be sent -- offset : Offset from which data should be sent, default is 1 -- lsize : Size of data to be sent, default is 0 -- chunk_size : Size used for creating LOB chunks, default is 2048 CREATE OR REPLACE FUNCTION do_enq_clob(src_dbname VARCHAR2, tab_owner VARCHAR2, tab_name VARCHAR2, col_name VARCHAR2, new_vals SYS.LCR$_ROW_LIST, clob_data CLOB, offset NUMBER default 1, lsize NUMBER default 0, chunk_size NUMBER default 2048) RETURN NUMBER IS lob_offset NUMBER; -- maintain lob offset newunit SYS.LCR$_ROW_UNIT; tnewvals SYS.LCR$_ROW_LIST; lob_flag NUMBER; lob_data VARCHAR2(32767); lob_size NUMBER; unit_pos NUMBER; final_size NUMBER; exit_flg BOOLEAN; c_size NUMBER; i NUMBER; BEGIN lob_size := DBMS_LOB.GETLENGTH(clob_data); unit_pos := new_vals.count + 1; tnewvals := new_vals; c_size := chunk_size; i := 0; -- validate parameters IF (unit_pos <= 1) THEN DBMS_OUTPUT.PUT_LINE('Invalid new_vals list'); RETURN 1; END IF; IF (c_size < 1) THEN DBMS_OUTPUT.PUT_LINE('Invalid LOB chunk size'); RETURN 1; END IF; IF (lsize < 0 OR lsize > lob_size) THEN DBMS_OUTPUT.PUT_LINE('Invalid LOB size'); RETURN 1; END IF; IF (offset < 1 OR offset >= lob_size) THEN DBMS_OUTPUT.PUT_LINE('Invalid lob offset'); RETURN 1; ELSE lob_offset := offset; END IF; -- calculate final size IF (lsize = 0) THEN final_size := lob_size; ELSE final_size := lob_offset + lsize; END IF; -- The following output lines are for debugging purposes only. -- DBMS_OUTPUT.PUT_LINE('Final size: ' || final_size); -- DBMS_OUTPUT.PUT_LINE('Lob size: ' || lob_size); IF (final_size < 1 OR final_size > lob_size) THEN DBMS_OUTPUT.PUT_LINE('Invalid lob size'); RETURN 1; END IF; -- expand new_vals list for LOB column tnewvals.extend(); exit_flg := FALSE; -- Enqueue all LOB chunks LOOP -- The following output line is for debugging purposes only. DBMS_OUTPUT.PUT_LINE('About to write chunk#' || i); i := i + 1; -- check if last LOB chunk IF ((lob_offset + c_size) < final_size) THEN lob_flag := DBMS_LCR.LOB_CHUNK; ELSE lob_flag := DBMS_LCR.LAST_LOB_CHUNK; exit_flg := TRUE; -- The following output line is for debugging purposes only. DBMS_OUTPUT.PUT_LINE('Last LOB chunk'); END IF; -- The following output lines are for debugging purposes only. DBMS_OUTPUT.PUT_LINE('lob offset: ' || lob_offset); DBMS_OUTPUT.PUT_LINE('Chunk size: ' || to_char(c_size)); lob_data := DBMS_LOB.SUBSTR(clob_data, c_size, lob_offset); -- create row unit for clob newunit := SYS.LCR$_ROW_UNIT(col_name, SYS.AnyData.ConvertVarChar2(lob_data), lob_flag, lob_offset, NULL); -- insert new LCR$_ROW_UNIT tnewvals(unit_pos) := newunit; -- enqueue lcr enq_row_lcr( source_dbname => src_dbname, cmd_type => 'LOB WRITE', obj_owner => tab_owner, obj_name => tab_name, old_vals => NULL, new_vals => tnewvals); -- calculate next chunk size lob_offset := lob_offset + c_size; IF ((final_size - lob_offset) < c_size) THEN c_size := final_size - lob_offset + 1; END IF; -- The following output line is for debugging purposes only. DBMS_OUTPUT.PUT_LINE('Next chunk size : ' || TO_CHAR(c_size)); IF (c_size < 1) THEN exit_flg := TRUE; END IF; EXIT WHEN exit_flg; END LOOP; RETURN 0; END do_enq_clob; / SHOW ERRORS /*
The DBMS_OUTPUT
lines in the following example can be used for debugging purposes if necessary. If they are not needed, then they can be commented out or deleted.
*/ SET SERVEROUTPUT ON SIZE 100000 DECLARE c1_data CLOB; c2_data CLOB; c3_data CLOB; newunit1 SYS.LCR$_ROW_UNIT; newunit2 SYS.LCR$_ROW_UNIT; newunit3 SYS.LCR$_ROW_UNIT; newunit4 SYS.LCR$_ROW_UNIT; newvals SYS.LCR$_ROW_LIST; big_data VARCHAR(22000); n NUMBER; BEGIN -- Create primary key for LCR$_ROW_UNIT newunit1 := SYS.LCR$_ROW_UNIT('A', Sys.AnyData.ConvertNumber(3), NULL, NULL, NULL); -- Create empty CLOBs newunit2 := sys.lcr$_row_unit('C1', Sys.AnyData.ConvertVarChar2(NULL), DBMS_LCR.EMPTY_LOB, NULL, NULL); newunit3 := SYS.LCR$_ROW_UNIT('C2', Sys.AnyData.ConvertVarChar2(NULL), DBMS_LCR.EMPTY_LOB, NULL, NULL); newunit4 := SYS.LCR$_ROW_UNIT('C3', Sys.AnyData.ConvertVarChar2(NULL), DBMS_LCR.EMPTY_LOB, NULL, NULL); newvals := SYS.LCR$_ROW_LIST(newunit1,newunit2,newunit3,newunit4); -- Perform an insert enq_row_lcr( source_dbname => 'MYDB.NET', cmd_type => 'INSERT', obj_owner => 'LOB_USER', obj_name => 'WITH_CLOB', old_vals => NULL, new_vals => newvals); -- construct clobs big_data := RPAD('Hello World', 1000, '_'); big_data := big_data || '#'; big_data := big_data || big_data || big_data || big_data || big_data; DBMS_LOB.CREATETEMPORARY( lob_loc => c1_data, cache => TRUE); DBMS_LOB.WRITEAPPEND( lob_loc => c1_data, amount => length(big_data), buffer => big_data); big_data := RPAD('1234567890#', 1000, '_'); big_data := big_data || big_data || big_data || big_data; DBMS_LOB.CREATETEMPORARY( lob_loc => c2_data, cache => TRUE); DBMS_LOB.WRITEAPPEND( lob_loc => c2_data, amount => length(big_data), buffer => big_data); big_data := RPAD('ASDFGHJKLQW', 2000, '_'); big_data := big_data || '#'; big_data := big_data || big_data || big_data || big_data || big_data; DBMS_LOB.CREATETEMPORARY( lob_loc => c3_data, cache => TRUE); DBMS_LOB.WRITEAPPEND( lob_loc => c3_data, amount => length(big_data), buffer => big_data); -- pk info newunit1 := SYS.LCR$_ROW_UNIT('A', SYS.AnyData.ConvertNumber(3), NULL, NULL, NULL); newvals := SYS.LCR$_ROW_LIST(newunit1); -- write c1 clob n := do_enq_clob( src_dbname => 'MYDB.NET', tab_owner => 'LOB_USER', tab_name => 'WITH_CLOB', col_name => 'C1', new_vals => newvals, clob_data => c1_data, offset => 1, chunk_size => 1024); DBMS_OUTPUT.PUT_LINE('n=' || n); -- write c2 clob newvals := SYS.LCR$_ROW_LIST(newunit1); n := do_enq_clob( src_dbname => 'MYDB.NET', tab_owner => 'LOB_USER', tab_name => 'WITH_CLOB', col_name => 'C2', new_vals => newvals, clob_data => c2_data, offset => 1, chunk_size => 2000); DBMS_OUTPUT.PUT_LINE('n=' || n); -- write c3 clob newvals := SYS.LCR$_ROW_LIST(newunit1); n := do_enq_clob(src_dbname=>'MYDB.NET', tab_owner => 'LOB_USER', tab_name => 'WITH_CLOB', col_name => 'C3', new_vals => newvals, clob_data => c3_data, offset => 1, chunk_size => 500); DBMS_OUTPUT.PUT_LINE('n=' || n); COMMIT; END; / /*
Check the lob_construct.out
spool file to ensure that all actions completed successfully after this script completes.
*/ SET ECHO OFF SPOOL OFF /*************************** END OF SCRIPT ******************************/
After you run the script, you can check the lob_user.with_clob
table to list the rows applied by the apply process. The DBMS_LOCK.SLEEP
statement is used to give the apply process time to apply the enqueued rows.
CONNECT lob_user/lob_user_pw EXECUTE DBMS_LOCK.SLEEP(10); SELECT a, c1, c2, c3 FROM with_clob ORDER BY a; SELECT a, LENGTH(c1), LENGTH(c2), LENGTH(c3) FROM with_clob ORDER BY a;
You can set or get the value of the tags generated by the current session or by an apply process. The following sections describe how to set and get tag values.
This section contains instructions for setting and getting the tag for the current session.
You can set the tag for all redo entries generated by the current session using the SET_TAG
procedure in the DBMS_STREAMS
package. For example, to set the tag to the hexadecimal value of '1D'
in the current session, run the following procedure:
BEGIN DBMS_STREAMS.SET_TAG( tag => HEXTORAW('1D')); END;
/
After running this procedure, each redo entry generated by DML or DDL statements in the current session will have a tag value of 1D
. Running this procedure affects only the current session.
You can get the tag for all redo entries generated by the current session using the GET_TAG
procedure in the DBMS_STREAMS
package. For example, to get the hexadecimal value of the tags generated in the redo entries for the current session, run the following procedure:
SET SERVEROUTPUT ON DECLARE raw_tag RAW(2048); BEGIN raw_tag := DBMS_STREAMS.GET_TAG(); DBMS_OUTPUT.PUT_LINE('Tag Value = ' || RAWTOHEX(raw_tag)); END; /
You can also display the tag value for the current session by querying the DUAL
view:
SELECT DBMS_STREAMS.GET_TAG FROM DUAL;
This section contains instructions for setting and removing the tag for an apply process.
See Also:
|
An apply process generates redo entries when it applies changes to a database or invokes handlers. You can set the default tag for all redo entries generated by an apply process when you create the apply process using the CREATE_APPLY
procedure in the DBMS_APPLY_ADM
package, or when you alter an existing apply process using the ALTER_APPLY
procedure in the DBMS_APPLY_ADM
package. In both of these procedures, set the apply_tag
parameter to the value you want to specify for the tags generated by the apply process.
For example, to set the value of the tags generated in the redo log by an existing apply process named strm01_apply
to the hexadecimal value of '7'
, run the following procedure:
BEGIN DBMS_APPLY_ADM.ALTER_APPLY( apply_name => 'strm01_apply', apply_tag => HEXTORAW('7')); END;
/
After running this procedure, each redo entry generated by the apply process will have a tag value of 7
.
You remove the apply tag for an apply process by setting the remove_apply_tag
parameter to true
in the ALTER_APPLY
procedure in the DBMS_APPLY_ADM
package. Removing the apply tag means that each redo entry generated by the apply process has a NULL
tag. For example, the following procedure removes the apply tag from an apply process named strm02_apply
.
BEGIN DBMS_APPLY_ADM.ALTER_APPLY( apply_name => 'strm02_apply', remove_apply_tag => true); END; /
Point-in-time recovery is the recovery of a database to a specified noncurrent time, SCN, or log sequence number. If point-in-time recovery is required at a destination database in a Streams environment, then you must reapply the captured changes that had already been applied after the point-in-time of the recovery.
For each relevant capture process, you can choose either of the following methods to perform point-in-time recovery at a destination database in a Streams environment
Resetting the start SCN for the capture process is simpler than creating a new capture process. However, if the capture process captures changes that are applied at multiple destination databases, then the changes are resent to all the destination databases, including the ones that did not perform point-in-time recovery. If a change is already applied at a destination database, then it is discarded by the apply process, but you may not want to use the network and computer resources required to resend the changes to multiple destination databases. In this case, you can create and temporarily use a new capture process and a new propagation that propagates changes only to the destination database that was recovered.
The following sections provide instructions for each task:
If there are multiple apply processes at the destination database where you performed point-in-time recovery, then complete one of the tasks in this section for each apply process.
Neither of these methods should be used if any of the following conditions are true regarding the destination database you are recovering:
If any of these conditions are true in your environment, then you cannot use the methods described in this section. Instead, you must manually resynchronize the data at all destination databases.
See Also:
|
If you decide to reset the start SCN for the existing capture process to perform point-in-time recovery, then complete the following steps:
DROP_PROPAGATION
procedure in the DBMS_PROPAGATION_ADM
package to drop the propagation.
If you are using directed networks, and there are intermediate databases between the source database and destination database, then drop the propagation at each intermediate database in the path to the destination database, including the propagation at the source database.
Note: You must drop the appropriate propagation(s). Disabling them is not sufficient. You will re-create the propagation(s) in Step 6, and dropping them now ensures that only events created after resetting the start SCN for the capture process are propagated. |
The following statement is an example of the query to perform:
SELECT APPLY_NAME, OLDEST_MESSAGE_NUMBER FROM DBA_APPLY_PROGRESS;
STOP_CAPTURE
procedure in the DBMS_CAPTURE_ADM
package.To reset the start SCN for an existing capture process, run the ALTER_CAPTURE
procedure in the DBMS_CAPTURE_ADM
package and set the start_scn
parameter to the value you recorded from the query in Step 3. For example, to reset the start SCN for a capture process named strm01_capture
to the value 829381993
, run the following ALTER_CAPTURE
procedure:
BEGIN DBMS_CAPTURE_ADM.ALTER_CAPTURE( capture_name => 'strm01_capture', start_scn => 829381993); END; /
CREATE_PROPAGATION
procedure in the DBMS_PROPAGATION_ADM
package. Specify the rule set used by the original propagation for the rule_set_name
parameter when you create the propagation.
If you are using directed networks, and there are intermediate databases between the source database and destination database, then create a new propagation at each intermediate database in the path to the destination database, including the propagation at the source database.
START_CAPTURE
procedure in the DBMS_CAPTURE_ADM
package.If you decide to create a new capture process to perform point-in-time recovery, then complete the following steps:
DROP_PROPAGATION
procedure in the DBMS_PROPAGATION_ADM
package to drop the propagation.
If you are using directed networks, and there are intermediate databases between the source database and destination database, then drop the propagation that propagates events between the last intermediate database and the destination database. You do not need to drop the propagations at the other intermediate databases nor at the source database.
The following statement is an example of the query to perform:
SELECT APPLY_NAME, OLDEST_MESSAGE_NUMBER FROM DBA_APPLY_PROGRESS;
SET_UP_QUEUE
procedure in the DBMS_STREAMS_ADM
package.
If you are using directed networks, and there are intermediate databases between the source database and destination database, then create a queue at each intermediate database in the path to the destination database, including the new queue at the source database. Do not create a new queue at the destination database.
CREATE_PROPAGATION
procedure in the DBMS_PROPAGATION_ADM
package. Specify the rule set used by the original propagation for the rule_set_name
parameter when you create the propagation.
If you are using directed networks, and there are intermediate databases between the source database and destination database, then create a propagation at each intermediate database in the path to the destination database, including the propagation from the source database to the first intermediate database. These propagations propagate changes captured by the capture process you will create in Step 6 between the queues created in Step 4.
CREATE_CAPTURE
procedure in the DBMS_CAPTURE_ADM
package. Set the source_queue
parameter to the local queue you created in Step 4, the rule_set_name
parameter to the rule set used by the original capture process, and the start_scn
parameter to the value you recorded from the query in Step 3. If the rule set used by the original capture process captures events that should not be sent to the destination database that was recovered, then you can create and use a smaller, customized rule set that shares some rules with the original rule set.START_CAPTURE
procedure in the DBMS_CAPTURE_ADM
package.STOP_CAPTURE
procedure in the DBMS_CAPTURE_ADM
package.
At the destination database, you can use the following query to determine the oldest message number from the source database for the apply process:
SELECT APPLY_NAME, OLDEST_MESSAGE_NUMBER FROM DBA_APPLY_PROGRESS;
At the source database, you can use the following query to determine the capture number of the original capture process:
SELECT CAPTURE_NAME, CAPTURE_MESSAGE_NUMBER FROM V$STREAMS_CAPTURE;
If you are using directed networks, and there are intermediate databases between the source database and destination database, then drop the new propagation at each intermediate database in the path to the destination database, including the new propagation at the source database.
If you are using directed networks, and there are intermediate databases between the source database and destination database, then drop the new queue at each intermediate database in the path to the destination database, including the new queue at the source database. Do not drop the queue at the destination database.
CREATE_PROPAGATION
procedure in the DBMS_PROPAGATION_ADM
package to create the propagation. Specify the rule set used by the original propagation for the rule_set_name
parameter when you create the propagation.
If you are using directed networks, and there are intermediate databases between the source database and destination database, then re-create the propagation from the last intermediate database to the destination database. You dropped this propagation in Step 1.
All of the steps after Step 7 can be deferred to a later time, or they can be done as soon as the condition described in Step 8 is met.
This section describes how to perform a full database export/import on a database that is running one or more Streams capture processes, propagations, or apply processes. These instructions pertain to a full database export/import where the import database and export database are running on different computers, and the import database replaces the export database. The global name of the import database and the global name of the export database must match.
Note: If you want to add a database to an existing Streams environment, then do not use the instructions in this section. Instead, see "Configuring a Capture-Based Streams Environment". |
See Also:
|
Complete the following steps to perform a full database export/import on a database that is using Streams:
DISABLE_PROPAGATION_SCHEDULE
procedure in the DBMS_AQADM
package.To complete this step, you may need to re-create the database links used by these propagation jobs or modify your Oracle networking files at the databases that contain the source queues.
GET_SYSTEM_CHANGE_NUMBER
function in the DBMS_FLASHBACK
package.
After completing this step, do not stop any capture process running on the export database. Step 10 instructs you to use the V$STREAMS_CAPTURE
dynamic performance view to ensure that no DML or DDL changes were made to the database after Step 3. The information about a capture process in this view is reset if the capture process is stopped and restarted.
For the check in Step 10 to be valid, this information should not be reset for any capture process. To prevent a capture process from stopping automatically, you may need to set the message_limit
and time_limit
capture process parameters to infinite
if these parameters are set to another value for any capture process.
FULL
export parameter is set to y
so that the required Streams metadata is exported.
If the export database is running one or more apply processes or is propagating user-enqueued events, then do not start the export and proceed to the next step.
You can view the applied SCN for each capture process by querying the APPLIED_SCN
column in the DBA_CAPTURE
data dictionary view.
DISABLE_PROPAGATION_SCHEDULE
procedure in the DBMS_AQADM
package.FULL
export parameter is set to y
so that the required Streams metadata is exported. If you already started the export in Step 5, then proceed to Step 9.GET_SYSTEM_CHANGE_NUMBER
function in the DBMS_FLASHBACK
package. This SCN will be called the new SCN.CAPTURE_MESSAGE_NUMBER
column in the V$STREAMS_CAPTURE
dynamic performance view.ENQUEUE_MESSAGE_NUMBER
column in the V$STREAMS_CAPTURE
dynamic performance view.
If the enqueue message number of each capture process is less than or equal to the SCN determined in Step 4, then proceed to Step 11.
However, if the enqueue message number of any capture process is higher than the SCN determined in Step 4, then one or more DML or DDL changes were made after the SCN determined in Step 4, and these changes were captured and enqueued by a capture process. In this case, perform all of the steps in this section again, starting with Step 1.
Note: For this verification to be valid, each capture process must have been running uninterrupted since Step 4. |
STREAMS_CONFIGURATION
and FULL
import parameters are both set to y
so that the required Streams metadata is imported. The default setting is y
for the STREAMS_CONFIGURATION
import parameter. Also, make sure no DML or DDL changes are made to the import database during the import.If you reset the value of a message_limit
or time_limit
capture process
parameter in Step 4, then reset these parameters to their original settings.