Never use DDL or DML against tables created by dbms_aqadm.create_queue_table
. Only use dbms_aqadm and dbms_aq to work with these tables. Oracle may make several supporting tables, indexes, etc that you will not be aware of. Manually running DDL or DML against the table may lead you to a scenario where Oracle Support will need you to drop and recreate the table and queues to resolve the situation.
It's strongly recommended you do not use dbms_aq.forever
for a wait option. This has caused issues in the past as Oracle may start scheduling an excessive number of worker jobs to work the queues that are unnecessary (see Oracle Doc ID 2001165.1).
It's recommended you do not set the AQ_TM_PROCESSES parameter in version 10.1 and later. Especially avoid setting this to zero since this will disable the QMON background job that is necessary to maintain the queues. You can reset this value to the Oracle default using the following command and restarting the database. alter system reset aq_tm_processes scope=spfile sid='*';
Create a queue that we can send a message to. Oracle will notify our stored procedure that a message has been enqueued and should be worked. We'll also add some subprograms we can use in an emergency to stop messages from being deqeued, allow dequeuing again, and run a simple batch job to work through all of the messages.
These examples were tested on Oracle Database 12c Enterprise Edition Release 12.1.0.2.0 - 64bit Production.
We will create a message type, a queue table that can hold the messages, and a queue. Messages in the queue will be dequeued first by priority then be their enqueue time. If anything goes wrong working the message and the dequeue is rolled-back AQ will make the message available for dequeue 3600 seconds later. It will do this 48 times before moving it an exception queue.
create type message_t as object
(
sender varchar2 ( 50 ),
message varchar2 ( 512 )
);
/
-- Type MESSAGE_T compiled
begin dbms_aqadm.create_queue_table(
queue_table => 'MESSAGE_Q_TBL',
queue_payload_type => 'MESSAGE_T',
sort_list => 'PRIORITY,ENQ_TIME',
multiple_consumers => false,
compatible => '10.0.0');
end;
/
-- PL/SQL procedure successfully completed.
begin dbms_aqadm.create_queue(
queue_name => 'MESSAGE_Q',
queue_table => 'MESSAGE_Q_TBL',
queue_type => 0,
max_retries => 48,
retry_delay => 3600,
dependency_tracking => false);
end;
/
-- PL/SQL procedure successfully completed.
Now that we have a place to put the messages lets create a package to manage and work messages in the queue.
create or replace package message_worker_pkg
is
queue_name_c constant varchar2(20) := 'MESSAGE_Q';
-- allows the workers to process messages in the queue
procedure enable_dequeue;
-- prevents messages from being worked but will still allow them to be created and enqueued
procedure disable_dequeue;
-- called only by Oracle Advanced Queueing. Do not call anywhere else.
procedure on_message_enqueued (context in raw,
reginfo in sys.aq$_reg_info,
descr in sys.aq$_descriptor,
payload in raw,
payloadl in number);
-- allows messages to be worked if we missed the notification (or a retry
-- is pending)
procedure work_old_messages;
end;
/
create or replace package body message_worker_pkg
is
-- raised by Oracle when we try to dequeue but no more messages are ready to
-- be dequeued at this moment
no_more_messages_ex exception;
pragma exception_init (no_more_messages_ex,
-25228);
-- allows the workers to process messages in the queue
procedure enable_dequeue
as
begin
dbms_aqadm.start_queue (queue_name => queue_name_c, dequeue => true);
end enable_dequeue;
-- prevents messages from being worked but will still allow them to be created and enqueued
procedure disable_dequeue
as
begin
dbms_aqadm.stop_queue (queue_name => queue_name_c, dequeue => true, enqueue => false);
end disable_dequeue;
procedure work_message (message_in in out nocopy message_t)
as
begin
dbms_output.put_line ( message_in.sender || ' says ' || message_in.message );
end work_message;
-- called only by Oracle Advanced Queueing. Do not call anywhere else.
procedure on_message_enqueued (context in raw,
reginfo in sys.aq$_reg_info,
descr in sys.aq$_descriptor,
payload in raw,
payloadl in number)
as
pragma autonomous_transaction;
dequeue_options_l dbms_aq.dequeue_options_t;
message_id_l raw (16);
message_l message_t;
message_properties_l dbms_aq.message_properties_t;
begin
dequeue_options_l.msgid := descr.msg_id;
dequeue_options_l.consumer_name := descr.consumer_name;
dequeue_options_l.wait := dbms_aq.no_wait;
dbms_aq.dequeue (queue_name => descr.queue_name,
dequeue_options => dequeue_options_l,
message_properties => message_properties_l,
payload => message_l,
msgid => message_id_l);
work_message (message_l);
commit;
exception
when no_more_messages_ex
then
-- it's possible work_old_messages already dequeued the message
commit;
when others
then
-- we don't need to have a raise here. I just wanted to point out that
-- since this will be called by AQ throwing the exception back to it
-- will have it put the message back on the queue and retry later
raise;
end on_message_enqueued;
-- allows messages to be worked if we missed the notification (or a retry
-- is pending)
procedure work_old_messages
as
pragma autonomous_transaction;
dequeue_options_l dbms_aq.dequeue_options_t;
message_id_l raw (16);
message_l message_t;
message_properties_l dbms_aq.message_properties_t;
begin
dequeue_options_l.wait := dbms_aq.no_wait;
dequeue_options_l.navigation := dbms_aq.first_message;
while (true) loop -- way out is no_more_messages_ex
dbms_aq.dequeue (queue_name => queue_name_c,
dequeue_options => dequeue_options_l,
message_properties => message_properties_l,
payload => message_l,
msgid => message_id_l);
work_message (message_l);
commit;
end loop;
exception
when no_more_messages_ex
then
null;
end work_old_messages;
end;
Next tell AQ that when a message is enqueued to MESSAGE_Q (and committed) notify our procedure it has work to do. AQ will start up a job in its own session to handle this.
begin
dbms_aq.register (
sys.aq$_reg_info_list (
sys.aq$_reg_info (user || '.' || message_worker_pkg.queue_name_c,
dbms_aq.namespace_aq,
'plsql://' || user || '.message_worker_pkg.on_message_enqueued',
hextoraw ('FF'))),
1);
commit;
end;
declare
enqueue_options_l dbms_aq.enqueue_options_t;
message_properties_l dbms_aq.message_properties_t;
message_id_l raw (16);
message_l message_t;
begin
-- only need to do this next line ONCE
dbms_aqadm.start_queue (queue_name => message_worker_pkg.queue_name_c, enqueue => true , dequeue => true);
message_l := new message_t ( 'Jon', 'Hello, world!' );
dbms_aq.enqueue (queue_name => message_worker_pkg.queue_name_c,
enqueue_options => enqueue_options_l,
message_properties => message_properties_l,
payload => message_l,
msgid => message_id_l);
commit;
end;