Oracle9i Streams Release 2 (9.2) Part Number A96571-02 |
|
|
View PDF |
This chapter explains the concepts relating to staging events in a queue and propagating events from one queue to another.
This chapter contains these topics:
Streams uses queues of type SYS.AnyData
to stage events. There are two types of events that can be staged in a Streams queue: logical change records (LCRs) and user messages. LCRs are objects that contain information about a change to a database object, while user messages are custom messages created by users or applications. Both types of events are of type SYS.AnyData
and can be used for information sharing within a single database or between databases.
Staged events can be consumed or propagated, or both. These events can be consumed by an apply process or by a user application that explicitly dequeues them. Even after an event is consumed, it may remain in the queue if you have also configured Streams to propagate the event to one or more other queues or if message retention is specified. These other queues may reside in the same database or in different databases. In either case, the queue from which the events are propagated is called the source queue, and the queue that receives the events is called the destination queue. There can be a one-to-many, many-to-one, or many-to-many relationship between source and destination queues. Figure 3-1 shows propagation from a source queue to a destination queue.
You can create, alter, and drop a propagation, and you can define propagation rules that control which events are propagated. The user who owns the source queue is the user who propagates events. This user must have the necessary privileges to propagate events. These privileges include the following:
If the propagation propagates events to a destination queue in a remote database, then the owner of the source queue must be able to use the propagation's database link, and the user to which the database link connects at the remote database must have enqueue privilege on the destination queue.
See Also:
Oracle9i Application Developer's Guide - Advanced Queuing for more information about message retention |
Events can be enqueued in two ways:
SYS.AnyData
. These user messages can contain LCRs or any other type of message. Any user message that was explicitly enqueued by a user or an application is called a user-enqueued event. Events that were enqueued by a user procedure called from an apply process are also user-enqueued events.So, each captured event contains an LCR, but a user-enqueued event may or may not contain an LCR. Propagating a captured event or a user-enqueued event enqueues the event into the destination queue.
Events can be dequeued in two ways:
The dequeued events may have originated at the same database where they are dequeued, or they may have originated at a different database.
See Also:
|
You can use Streams to configure event propagation between two queues, which may reside in different databases. Streams uses job queues to propagate events.
A propagation is always between a source queue and a destination queue. Although propagation is always between two queues, a single queue may participate in many propagations. That is, a single source queue may propagate events to multiple destination queues, and a single destination queue may receive events from multiple source queues. However, only one propagation is allowed between a particular source queue and a particular destination queue. Also, a single queue may be a destination queue for some propagations and a source queue for other propagations.
A propagation may propagate all of the events in a source queue to the destination queue, or a propagation may propagate only a subset of the events. Also, a single propagation can propagate both captured and user-enqueued events. You can use rules to control which events in the source queue are propagated to the destination queue.
Depending on how you set up your Streams environment, changes could be sent back to the site where they originated. You need to ensure that your environment is configured to avoid cycling the change in an endless loop. You can use Streams tags to avoid such a change cycling loop.
See Also:
|
A propagation propagates events based on rules that you define. For LCR events, each rule specifies the database objects for which the propagation propagates changes and the types of changes to propagate. You can specify propagation rules for LCR events at the following levels:
For non-LCR events, you can create your own rules to control propagation.
A queue subscriber that specifies a condition causes the system to generate a rule. The rule sets for all subscribers to a queue are combined into a single system-generated rule set to make subscription more efficient.
A user-enqueued event is propagated successfully to a destination queue when the enqueue into the destination queue is committed. A captured event is propagated successfully to a destination queue when both of the following actions are completed:
When an event is successfully propagated between two Streams queues, the destination queue acknowledges successful propagation of the event. If the source queue is configured to propagate an event to multiple destination queues, then the event remains in the source queue until each destination queue has sent confirmation of event propagation to the source queue. When each destination queue acknowledges successful propagation of the event, and all local consumers in the source queue database have consumed the event, the source queue can drop the event.
This confirmation system ensures that events are always propagated from the source queue to the destination queue, but, in some configurations, the source queue can grow larger than an optimal size. When a source queue grows, it uses more SGA memory and may use more disk space.
There are two common reasons for source-queue growth:
A directed network is one in which propagated events may pass through one or more intermediate databases before arriving at a destination database. An event may or may not be processed by an apply process at an intermediate database. Using Streams, you can choose which events are propagated to each destination database, and you can specify the route that events will traverse on their way to a destination database. Figure 3-2 shows an example of a directed networks environment.
The advantage of using a directed network is that a source database need not have a physical network connection with the destination database. So, if you want events to propagate from one database to another, but there is no direct network connection between the computers running these databases, then you can still propagate the events without reconfiguring your network, as long as one or more intermediate databases connect the source database to the destination database.
If you use directed networks, and an intermediate site goes down for an extended period of time or is removed, then you may need to reconfigure the network and the Streams environment.
An intermediate database in a directed network may propagate events using queue forwarding or apply forwarding. Queue forwarding means that the events being forwarded at an intermediate database are the events received by the intermediate database. The source database for an event is the database where the event originated.
Apply forwarding means that the events being forwarded at an intermediate database are first processed by an apply process. These events are then recaptured by a capture process at the intermediate database and forwarded. When you use apply forwarding, the intermediate database becomes the new source database for the events because the events are recaptured there.
Consider the following differences between queue forwarding and apply forwarding when you plan your Streams environment:
A single Streams environment may use a combination of queue forwarding and apply forwarding.
Queue forwarding has the following advantages compared to apply forwarding:
GET_SOURCE_DATABASE_NAME
member procedure on the LCR contained in the event. If you use apply forwarding, then determining the origin of an event requires the use of Streams tags and apply handlers.If you use apply forwarding, then substantially more work may be required to reconfigure end-to-end capture, propagation, and apply of events, because the destination database(s) downstream from the unavailable intermediate database were using the SCN information of this intermediate database. Without this SCN information, the destination databases cannot apply the changes properly.
Apply forwarding has the following advantages compared to queue forwarding:
See Also:
|
Streams enables messaging with queues of type SYS.AnyData
. These queues are called Streams queues. Streams queues can stage user messages whose payloads are of SYS.AnyData
type. A SYS.AnyData
payload can be a wrapper for payloads of different datatypes. A queue that can stage messages of only a particular type are called typed queues.
Using SYS.AnyData
wrappers for message payloads, publishing applications can enqueue messages of different types into a single queue, and subscribing applications can dequeue these messages, either explicitly using a dequeue API or implicitly using an apply process. If the subscribing application is remote, then the messages can be propagated to the remote site, and the subscribing application can dequeue the messages from a local queue in the remote database. Alternatively, a remote subscribing application can dequeue messages directly from the source queue using a variety of standard protocols, such as PL/SQL and OCI.
Streams interoperates with Advanced Queuing (AQ), which supports all the standard features of message queuing systems, including multiconsumer queues, publish and subscribe, content-based routing, internet propagation, transformations, and gateways to other messaging subsystems.
You can wrap almost any type of payload in a SYS.AnyData
payload. To do this, you use the Convert
data_type
static functions of the SYS.AnyData
type, where data_type
is the type of object to wrap. These functions take the object as input and return a SYS.AnyData
object.
The following datatypes cannot be wrapped in a SYS.AnyData
wrapper:
The following datatypes can be directly wrapped in a SYS.AnyData
wrapper, but these datatypes cannot be present in a user-defined type payload wrapped in a SYS.AnyData
wrapper:
CLOB
BLOB
BFILE
VARRAY
See Also:
|
Your applications can use the following programmatic environments to enqueue user messages into a Streams queue and dequeue user messages from a Streams queue:
The following sections provide information about using these interfaces to enqueue user messages into and dequeue user messages from a Streams queue.
See Also:
Oracle9i Application Developer's Guide - Advanced Queuing for more information about these programmatic interfaces |
To enqueue a user message containing an LCR into a Streams queue using PL/SQL, first create the LCR to be enqueued. You use the constructor for the SYS.LCR$_ROW_RECORD
type to create a row LCR, and you use the constructor for the SYS.LCR$_DDL_RECORD
type to create a DDL LCR. Then you use the SYS.AnyData.ConvertObject
function to convert the LCR into SYS.AnyData
payload and enqueue it using the DBMS_AQ.ENQUEUE
procedure.
To enqueue a user message containing a non-LCR object into a Streams queue using PL/SQL, you use one of the SYS.AnyData.Convert*
functions to convert the object into SYS.AnyData
payload and enqueue it using the DBMS_AQ.ENQUEUE
procedure.
To enqueue a user message containing an LCR into a Streams queue using JMS or OCI, you must represent the LCR in XML format. To construct an LCR, use the oracle.xdb.XMLType
class. LCRs are defined in the SYS
schema. The LCR schema must be loaded into the SYS
schema using the catxlcr.sql
script in Oracle home in the rdbms/admin/
directory.
To enqueue a message using OCI, perform the same actions that you would to enqueue a message into a typed queue. A typed queue is a queue that can stage messages of a particular type only. To enqueue a message using JMS, a user must have EXECUTE
privilege on DBMS_AQ
, DBMS_AQIN
, and DBMS_AQJMS
packages.
A non-LCR user message can be a message of any user-defined type or a JMS type. The JMS types include the following:
javax.jms.TextMessage
javax.jms.MapMessage
javax.jms.StreamMessage
javax.jms.ObjectMessage
javax.jms.BytesMessage
When using user-defined types, you must generate the Java class for the message using Jpublisher, which implements the ORAData
interface. To enqueue a message into a Streams queue, you can use methods QueueSender.send
or TopicPublisher.publish
.
See Also:
|
To dequeue a user message from Streams queue using PL/SQL, you use the DBMS_AQ.DEQUEUE
procedure and specify SYS.AnyData
as the payload. The user message may contain an LCR or another type of object.
In a Streams queue, user messages containing LCRs in XML format are represented as oracle.xdb.XMLType
. Non-LCR messages can be one of the following formats:
javax.jms.TextMessage
, javax.jms.MapMessage
, javax.jms.StreamMessage
, javax.jms.ObjectMessage
, or javax.jms.BytesMessage
)To dequeue a message from a Streams queue using JMS, you can use methods QueueReceiver
, TopicSubscriber
, or TopicReceiver
. Because the queue may contain different types of objects wrapped in a SYS.AnyData
wrapper, you must register a list of SQL types and their corresponding Java classes in the typemap of the JMS session. JMS types are already preregistered in the typemap.
For example, suppose a queue contains LCR messages represented as oracle.xdb.XMLType
and messages of type person
and address
. The classes JPerson.java
and JAddress.java
are the ORAData
mappings for person
and address
, respectively. Before dequeuing the message, the type map must be populated as follows:
java.util.Dictionary map = ((AQjmsSession)q_sess).getTypeMap(); map.put("SCOTT.PERSON", Class.forName("JPerson")); map.put("SCOTT.ADDRESS", Class.forName("JAddress")); map.put("SYS.XMLTYPE", Class.forName("oracle.xdb.XMLType")); // For LCRs
When using message selectors with QueueReceiver
or TopicPublisher
, the selector can contain any SQL92 expression that has a combination of one or more of the following:
JMSPriority
, JMSCorrelationID
, JMSType
, JMSXUserI
, JMSXAppID
, JMSXGroupID
, and JMSXGroupSeq
. The following is an example of a JMS message field:
JMSPriority < 3 AND JMSCorrelationID = 'Fiction'
color IN ('RED', 'BLUE', 'GREEN') AND price < 30000
hr.GET_TYPE(tab.user_data) = 'HR.EMPLOYEES'
To dequeue a message using OCI, perform the same actions that you would to dequeue a message from a typed queue.
See Also:
|
SYS.AnyData
queues can interoperate with typed queues in a Streams environment. A typed queue can stage messages of a particular type only. Table 3-1 shows the types of propagation possible between queues.
Source Queue | Destination Queue | Transformation |
---|---|---|
|
|
None |
Typed |
|
Note: Propagation is possible only if the messages in the typed queue meet the restrictions outlined in "User-Defined Type Messages". |
|
Typed |
Requires a rule to filter messages and a user-defined transformation |
Typed |
Typed |
Follows Advanced Queuing (AQ) rules (see Oracle9i Application Developer's Guide - Advanced Queuing for information) |
To propagate messages containing a payload of a certain type from a SYS.AnyData
source queue to a typed destination queue, you must perform a transformation. Only messages containing a payload of the same type as the typed queue can be propagated to the typed queue.
Although you cannot use Simple Object Access Protocol (SOAP) to interact directly with a Streams queue, you can use SOAP with Streams by propagating messages between a Streams queue and a typed queue. If you want to enqueue a message into a Streams queue using SOAP, then you can configure propagation from a typed queue to Streams queue. Then, you can use SOAP to enqueue a message into the typed queue. The message will be propagated automatically from the typed queue to the Streams queue.
If you want to use SOAP to dequeue a message that is in a Streams queue, then you can configure propagation from a Streams queue to a typed queue. The message will be propagated automatically from the Streams queue to the typed queue. Then, the message would be available for access using SOAP.
Note: Certain Streams capabilities, such as capturing changes using a capture process and applying changes with an apply process, can be configured only with |
See Also:
|
If you plan to enqueue, propagate, or dequeue user-defined type messages in a Streams environment, then each type used in these messages must exist at every database where the message may be staged in a queue. Some environments use directed networks to route messages through intermediate databases before they reach their destination. In such environments, the type must exist at each intermediate database, even if the messages of this type are never enqueued or dequeued at a particular intermediate database.
In addition, the following requirements must be met for such types:
The object identifier (OID) need not match at each database.
See Also:
|
You can configure a Streams queue to stage and propagate captured and user-enqueued events in a Real Application Clusters environment. In a Real Application Clusters environment, only the owner instance may have a buffer for a queue. Different instances may have buffers for different queues. Queue buffers are discussed later in this chapter. A queue buffer is System Global Area (SGA) memory associated with a Streams queue that contains only captured events.
A Streams queue that contains only user-enqueued events behaves the same as a typed queue in a Real Application Clusters environment. However, if a Streams queue contains or will contain captured events in a Real Application Clusters environment, then the environment must meet the following requirements:
SET_UP_QUEUE
procedure in the DBMS_STREAMS_ADM
package. Creating or altering a queue table with the DBMS_AQADM
package is not supported if any queue in the queue table contains captured events.AQ_TM_PROCESSES
initialization parameter must be set to at least 1
on each instance.If the owner instance for a queue table containing a destination queue becomes unavailable, then queue ownership is transferred automatically to another instance in the cluster. If this happens, then database links from remote source queues must be reconfigured manually to connect to the instance that owns the destination queue. The DBA_QUEUE_TABLES
data dictionary view contains information about the owner instance for a queue table. A queue table may contain multiple queues. In this case, each queue in a queue table has the same owner instance as the queue table.
See Also:
|
In general, Streams queues and propagations use the infrastructure of AQ. However, unlike an AQ queue, which stages all events in a queue table, a Streams queue has a queue buffer to stage captured events in shared memory. This section describes queue buffers and discusses how queue buffers are used in a Real Application Clusters environment. This section also discusses propagation jobs and secure queues, and how they are used in Streams. In addition, this section discusses how transactional queues handle captured and user-enqueued events, as well as the need for a Streams data dictionary at databases that propagate captured events.
See Also:
|
A queue buffer is System Global Area (SGA) memory associated with a Streams queue that contains only captured events. A queue buffer enables Oracle to optimize captured events by buffering captured events in the SGA instead of always storing them in a queue table. This buffering of captured events happens in any database where captured events are staged in a Streams queue. Such a database may be a source database, an intermediate database, or a destination database. User-enqueued LCR events and user-enqueued non-LCR events are always staged in queue tables, not in queue buffers.
Queue buffers improve performance, but the contents of a queue buffer are lost if the instance containing the buffer shuts down normally or abnormally. Streams automatically recovers from these cases, assuming full database recovery is performed on the instance.
In a single database, all of the queue buffers combined can use up to 10% of SGA memory. A queue buffer may overflow if there is not enough shared memory available to hold captured events. Captured events that overflow a queue buffer are stored in the appropriate AQ$_
queue_table_name
_p
table, where queue_table_name
is the name of the queue table for the queue. If the events in a queue buffer are lost, the events spilled from the queue buffer are subsequently deleted in order to keep the queue buffer and its queue table in sync. Also, when a transaction is moved to an exception queue, all events in the transaction are staged in a queue table, not in a queue buffer.
See Also:
|
A Streams propagation is configured internally using the DBMS_JOBS
package. Therefore, a propagation job is the mechanism that propagates events from a source queue to a destination queue. Like other jobs configured using the DBMS_JOBS
package, propagation jobs have an owner, and they use job queue processes (J
nnn
) as needed to execute jobs.
A propagation job may be used by more than one propagation. All destination queues at a database receive events from a single source queue through a single propagation job. By using a single propagation job for multiple destination queues, Streams ensures that an event is sent to a destination database only once, even if the same message is received by multiple destination queues in the same database. Communication resources are conserved because messages are not sent more than once to the same database.
A propagation schedule specifies how often a propagation job propagates events from a source queue to a destination queue. Therefore, all propagations that use a propagation job have the same propagation schedule. A default propagation schedule is established for the new propagation job when you create the propagation job using one of the following procedures:
ADD_GLOBAL_PROPAGATION_RULE
procedure in the DBMS_STREAMS_ADM
packageADD_SCHEMA_PROPAGATION_RULE
procedure in the DBMS_STREAMS_ADM
packageADD_TABLE_PROPAGATION_RULE
procedure in the DBMS_STREAMS_ADM
packageCREATE_PROPAGATION
procedure in the DBMS_PROPAGATION_ADM
packageThe default schedule has the following properties:
SYSDATE()
.NULL
, which means infinite.NULL
, which means that propagation restarts as soon as it finishes the current duration.If you want to alter the default schedule for a propagation job, then use the ALTER_PROPAGATION_SCHEDULE
procedure in the DBMS_AQADM
package.
When the restricted session is enabled during system startup by issuing a STARTUP
RESTRICT
statement, propagation jobs with enabled propagation schedules do not propagate events. When the restricted session is disabled, each propagation schedule that is enabled and ready to run will run when there is an available job queue process.
When the restricted session is enabled in a running database by the SQL statement ALTER
SYSTEM
with the ENABLE
RESTRICTED
SESSION
clause, any running propagation job continues to run to completion. However, any new propagation job submitted for a propagation schedule is not started. Therefore, propagation for an enabled schedule may eventually come to a halt.
Secure queues are queues for which AQ agents must be explicitly associated with one or more database users who can perform queue operations, such as enqueue and dequeue. The owner of a secure queue can perform all queue operations on the queue, but other users cannot perform queue operations on a secure queue unless they are configured as secure queue users. In Streams, secure queues can be used to ensure that only the appropriate users and Streams processes enqueue events into a queue and dequeue events from a queue.
All Streams queues created using the SET_UP_QUEUE
procedure in the DBMS_STREAMS_ADM
package are secure queues. When you use the SET_UP_QUEUE
procedure to create a queue, any user specified by the queue_user
parameter is configured as a secure queue user of the queue automatically, if possible. The queue user is also granted ENQUEUE
and DEQUEUE
privileges on the queue. To enqueue events into and dequeue events from a queue, a queue user must also have EXECUTE
privilege on the DBMS_AQ
package. The SET_UP_QUEUE
procedure does not grant this privilege.
To configure the queue user as a secure queue user, the SET_UP_QUEUE
procedure creates an AQ agent with the same name as the user name, if one does not already exist. The user must use this agent to perform queue operations on the queue. If an agent with this name already exists and is associated with the queue user only, then it is used. SET_UP_QUEUE
then runs the ENABLE_DB_ACCESS
procedure in the DBMS_AQADM
package, specifying the agent and the user. If the agent that SET_UP_QUEUE
tries to create already exists and is associated with a user other than the user specified by queue_user
, then an error is raised. In this case, rename or remove the existing agent using the ALTER_AQ_AGENT
or DROP_AQ_AGENT
procedure, respectively, in the DBMS_AQADM
package. Then, retry SET_UP_QUEUE
.
When you create a capture process or an apply process, an AQ agent of the secure queue associated with the Streams process is configured automatically, and the user who runs the Streams process is specified as a secure queue user for this queue automatically. Therefore, a capture process is configured to enqueue into its secure queue automatically, and an apply process is configured to dequeue from its secure queue automatically.
For a capture process, the user who invokes the procedure that creates the capture process is the user who runs the capture process. For an apply process, the user specified as the apply_user
is the user who runs the apply process. If no apply_user
is specified, then the user who invokes the procedure that creates the apply process is the user who runs the apply process.
Also, if you change the apply_user
for an apply process using the ALTER_APPLY
procedure in the DBMS_APPLY_ADM
package, then the specified apply_user
is configured as a secure queue user of the queue used by the apply process. However, the old apply user remains configured as a secure queue user of the queue. To remove the old apply user, run the DISABLE_DB_ACCESS
procedure in the DBMS_AQADM
package, specifying the old apply user and the relevant AQ agent. You may also want to drop the agent if it is no longer needed. You can view the AQ agents and their associated users by querying the DBA_AQ_AGENT_PRIVS
data dictionary view.
If you create a SYS.AnyData
queue using the DBMS_AQADM
package, then you use the secure
parameter when you run the CREATE_QUEUE_TABLE
procedure to specify whether the queue is secure or not. The queue is secure if you specify true
for the secure
parameter when you run this procedure. When you use the DBMS_AQADM
package to create a secure queue, and you want to allow users to perform queue operations on the secure queue, then you must configure these secure queue users manually.
If you use the SET_UP_QUEUE
procedure in the DBMS_STREAMS_ADM
package to create a secure queue, and you want a user who is not the queue owner and who was not specified by the queue_user
parameter to perform operations on the queue, then you can configure the user as a secure queue user of the queue manually. Alternatively, you can run the SET_UP_QUEUE
procedure again and specify a different queue_user
for the queue. In this case, SET_UP_QUEUE
will skip queue creation, but it will configure the user specified by queue_user
as a secure queue user of the queue.
If you drop a capture process or an apply process, then the users who were configured as secure queue users for these processes remain secure queue users of the queue. To remove these users as secure queue users, run the DISABLE_DB_ACCESS
procedure in the DBMS_AQADM
package for each user. You may also want to drop the agent if it is no longer needed.
See Also:
|
A transactional queue is one in which user-enqueued events can be grouped into a set that are applied as one transaction. That is, an apply process performs a COMMIT
after it applies all the user-enqueued events in a group. The SET_UP_QUEUE
procedure in the DBMS_STREMS_ADM
package always creates a transactional queue.
A nontransactional queue is one in which each user-enqueued event is its own transaction. That is, an apply process performs a COMMIT
after each user-enqueued event it applies. In either case, the user-enqueued events may or may not contain user-created LCRs.
The difference between transactional and nontransactional queues is important only for user-enqueued events. An apply process always applies captured events in transactions that preserve the transactions executed at the source database. Table 3-2 shows apply process behavior for each type of event and each type of queue.
See Also:
|
When a capture process is created, a duplicate data dictionary called the Streams data dictionary is populated automatically. The Streams data dictionary is a multiversioned copy of some of the information in the primary data dictionary at a source database. The Streams data dictionary maps object numbers, object version information, and internal column numbers from the source database into table names, column names, and column datatypes when a capture process evaluates rules and creates LCRs. This mapping keeps each captured event as small as possible because the event can store numbers rather than names.
The mapping information in the Streams data dictionary at the source database may be needed to evaluate rules at any database that propagates the captured events from the source database. To make this mapping information available to a propagation, Oracle automatically populates a multiversioned Streams data dictionary at each site that has a Streams propagation. Oracle automatically sends internal messages that contain relevant information from the Streams data dictionary at the source database to all other databases that receive captured events from the source database.
The Streams data dictionary information contained in these internal messages in a queue may or may not be propagated by a propagation. Which Streams data dictionary information to propagate depends on the rule set for the propagation. When a propagation encounters Streams data dictionary information for a table, the propagation rule set is evaluated with partial information that includes the source database name, table name, and table owner.
If at least one rule in the rule set either evaluates to TRUE
(true_rules
) or could evaluate to TRUE
given more information (maybe_rules
), then the Streams data dictionary information is propagated. This rule can be either a DML rule or a DDL rule.
When Streams data dictionary information is propagated to a destination queue, it is incorporated into the Streams data dictionary at the database that contains the destination queue, in addition to being enqueued into the destination queue. Therefore, a propagation reading the destination queue in a directed networks configuration can forward LCRs immediately without waiting for the Streams data dictionary to be populated.