The eXtreme Parallel Processing introduce easy way hot to rewrite current code into parallel processed code.
Let's
assume we have current PLSQL code:
declare
l_text varchar2(100);
l_encrypted_raw raw
(2000); --
stores encrypted binary text
l_decrypted_raw raw
(2000); --
stores decrypted binary text
l_num_key_bytes number
:= 256/8; -- key length 256
bits (32 bytes)
l_key_bytes_raw raw
(32); --
stores 256-bit encryption key
l_encryption_type pls_integer
:= --
total encryption type
dbms_crypto.encrypt_aes256
+
dbms_crypto.chain_cbc
+
dbms_crypto.pad_pkcs5;
begin
dbms_output.put_line('open
sucessfull');
for
l_message_id in
1..10000
loop
-- begin processing
--- get record to be
processed
select
to_char(changed,
'yyyy-mon-dd hh24:mi')
into
l_text
from
test
where
id =
test_struct.message_id;
--- process record
l_key_bytes_raw
:= dbms_crypto.randombytes
(l_num_key_bytes);
for
i in 1..10000
loop
l_encrypted_raw
:= dbms_crypto.encrypt
(
src =>
utl_i18n.string_to_raw (l_text, 'AL32UTF8'),
typ =>
l_encryption_type,
key
=> l_key_bytes_raw
);
--
the encrypted value "l_encrypted_raw" can
be used here
l_text :=
to_char(i*i);
l_decrypted_raw
:= dbms_crypto.decrypt
(
src =>
l_encrypted_raw,
typ =>
l_encryption_type,
key
=> l_key_bytes_raw
);
l_text :=
utl_i18n.raw_to_char (l_decrypted_raw,
'AL32UTF8');
end
loop;
--- save result(s)
update
test
set
text = test_struct.text || l_text,
changed =
sysdate
where
id =
test_struct.message_id;
-- end processing
end
loop;
dbms_output.put_line('end
loop');
end;
/
And following structure of requests for processing:
prompt
create table
TEST
create
table TEST
(
id number not
null,
text
varchar2(100
byte),
changed
date,
status
varchar2(4000
char),
primary
key (id)
)
tablespace
&&SMALL_DATA;
comment
on table
TEST is
'Test
table to try XPP processing example'
;
comment
on column
TEST.id is
'Identification of row';
comment
on column
TEST.text is
'Text array';
comment
on column
TEST.changed
is 'Time of change to track
final effect';
comment
on column
TEST.status is
'Status of processing';
prompt
Prepare data
for test
into TEST
table
declare
id number;
begin
for
id in
1..4500
loop
-- begin processing
insert
into TEST
(id, changed)
values
(id, sysdate);
-- end processing
end
loop;
end;
/
It can be updated for using of the eXtreme Parallel
Processing following way:
1) Collect data required all required data for processing and initiate eXtreme Parallel Processing (xpp.initialize)
2) The
processing part in the loop should be separated into procedure.
3) Grant the new procedure to eXtreme Parallel Processing package. (We recommend to keep eXtreme Parallel Processing in separate schema under
separate user.)
4) Call processing using eXtreme Parallel Processing
Add 1) Call eXtreme Parallel Processing
initialization.
Note:
This step is creation in XPP all required structures and name of newly defined
type is returned in parameter
declare
p_queue_name varchar2(32767);
p_queue_type_body varchar2(32767);
p_storage_clause varchar2(32767);
p_queue_comments varchar2(32767);
p_user_to_grant varchar2(32767);
p_queue_type_name varchar2(32767);
-- out
begin
-- The name of
request queue (The name of queue is required for next processing.)
p_queue_name :=
'test02';
-- In this variable
is defined type required for input into procedure to_test
p_queue_type_body :=
' message_id number(15)
, subject varchar2(100)
, text varchar2(100)
, dollar_value number(4,2)';
-- This parameter is
not necessary, but it can be usefull
p_storage_clause :=
'tablespace &&small_data';
-- This parameter is
not necessary. It's only nice to have
p_queue_comments :=
'the first queue';
-- In this parameter
should be name of schema where is XPP installed
p_user_to_grant :=
'&&xpp_user';
&&xpp_owner..xpp.event_initiate
(
p_queue_name,
p_queue_type_body,
p_storage_clause,
p_queue_comments,
p_user_to_grant,
p_queue_type_name );
dbms_output.put_line('initiate
sucessfull new type name: '''||p_queue_type_name||'''.');
commit;
end;
/
Add 2) Result will be following:
create or
replace procedure
to_test (
test_struct in
&&xpp_owner..type_test02
)
as
l_text varchar2(100);
l_encrypted_raw raw
(2000); --
stores encrypted binary text
l_decrypted_raw raw
(2000); --
stores decrypted binary text
l_num_key_bytes number
:= 256/8; -- key length 256
bits (32 bytes)
l_key_bytes_raw raw
(32); --
stores 256-bit encryption key
l_encryption_type pls_integer
:= --
total encryption type
dbms_crypto.encrypt_aes256
+
dbms_crypto.chain_cbc
+
dbms_crypto.pad_pkcs5;
begin
-- begin processing
--- get record to be
processed
SELECT
to_char(changed,
'YYYY-MON-DD HH24:MI')
INTO
l_text
FROM
test
WHERE
id =
test_struct.message_id;
--- process record
l_key_bytes_raw
:= dbms_crypto.randombytes
(l_num_key_bytes);
for
i in 1..10000
loop
l_encrypted_raw
:= dbms_crypto.encrypt
(
src =>
utl_i18n.string_to_raw (l_text, 'AL32UTF8'),
typ =>
l_encryption_type,
key
=> l_key_bytes_raw
);
--
the encrypted value "l_encrypted_raw" can
be used here
l_text :=
to_char(i*i);
l_decrypted_raw
:= dbms_crypto.decrypt
(
src =>
l_encrypted_raw,
typ =>
l_encryption_type,
key
=> l_key_bytes_raw
);
l_text :=
utl_i18n.raw_to_char (l_decrypted_raw,
'AL32UTF8');
end
loop;
--- save result(s)
update
test
set
text = test_struct.text || l_text,
changed =
sysdate
where
id =
test_struct.message_id;
-- end processing
commit;
end
to_test;
/
Add 3) Grant of the procedure to XPP.
grant execute on to_execute to &&xpp_owner;
Add 4) Call of processing in XPP.
declare
l_queue_name varchar2(32767);
l_parallel_level number;
l_name_of_proc varchar2(32767);
l_message &&xpp_owner..type_test02;
l_message_id number(5);
l_subject varchar2(100);
l_text varchar2(100);
l_dollar_value number(4,2);
begin
l_queue_name :=
'test02'; --
The name of queue of requests initiate in previos
step
l_parallel_level :=
3; -- The number of
required parallel executions
l_name_of_proc :=
'&&xpp_user..to_test';
-- The name of procedure including current schema
dbms_output.put_line('test
to execute parallel.');
-- This proceedure is creation ad hock configuration for
processing
&&xpp_owner..xpp.event_open
(
p_queue_name =>
l_queue_name
,p_number_of_executors =>
l_parallel_level
,p_procedure_to_run =>
l_name_of_proc
);
dbms_output.put_line('open
sucessfull');
commit;
for
l_message_id in
1..4500
loop
l_subject :=
'subject '||to_char(l_message_id);
l_text :=
'xpp example '||to_char(l_message_id)||'
';
l_message :=
&&xpp_owner..type_test02(
l_message_id
,l_subject
,l_text
,l_dollar_value
);
-- The name of
package to be executed is derivated from name of
queue by adding prefix "xppi_"
-- The name of
procedure to be called is always "process"
-- Directly after the
first call is created parallel processing of requets
in the queue
&&xpp_owner..xppi_test02.process(
p_queue_name
=> l_queue_name
,p_message =>
l_message
);
commit;
end
loop;
dbms_output.put_line('end
loop');
-- While running this
procedure is processed all requests
-- It finish after
the end of processing XPP
&&xpp_owner..xpp.event_wait_processing_end(
l_queue_name);
dbms_output.put_line('waiting
finished sucessfull.');
commit;
-- This procedure is oposite of event_open if stop
processing
-- and remove
configuration of number of required parallel processes.
-- It also removes
configuration of names to be executed while processing
&&xpp_owner..xpp.event_close(l_queue_name);
dbms_output.put_line('close
sucessfull');
commit;
end;
/
Note: Sometime is required to
handle different way successfully processed and requests where raise while
processing an error.
create
or replace
procedure on_success
(
test_struct in
&&xpp_owner..type_test02
)
as
pragma
autonomous_transaction;
begin
--- save result(s)
update
test
set
status = 'done',
changed =
sysdate
where
id =
test_struct.message_id;
-- end processing
commit;
end
on_success;
/
create
or replace
procedure on_error
(
test_struct in
&&xpp_owner..type_test02
,p_sqlcode in
varchar2
,p_sqlterm in
varchar2
,p_backtrace in
varchar2
)
as
pragma
autonomous_transaction;
begin
--- save result(s)
update
test
set
status = substr('err:
ora-'||p_sqlcode||' p_sqlterm:'||p_sqlterm||' p_backtrace: '||p_backtrace,
1,
4000),
changed =
sysdate
where
id =
test_struct.message_id;
-- end processing
commit;
end
on_error;
/
The execution is changed only
while calling event_open following way:
declare
l_queue_name varchar2(32767);
l_parallel_level number;
l_name_of_proc varchar2(32767);
l_name_of_proc_ok varchar2(32767);
l_name_of_proc_err varchar2(32767);
l_message &&xpp_owner..type_test02;
l_message_id number(5);
l_subject varchar2(100);
l_text varchar2(100);
l_dollar_value number(4,2);
begin
l_queue_name :=
'test02'; --
The name of queue of requests initiate in previos
step
l_parallel_level :=
3; -- The number of
required parallel executions
l_name_of_proc :=
'&&xpp_user..to_test';
-- The name of procedure including current schema
l_name_of_proc_ok :=
'&&xpp_user..on_success';
-- The name of procedure to be called after successful
processing
l_name_of_proc_err :=
'&&xpp_user..on_error';
-- The name of procedure to be called when processing
raise exception
dbms_output.put_line('test
to execute parallel.');
-- This proceedure is creation ad hock configuration for
processing
&&xpp_owner..xpp.event_open
(
p_queue_name =>
l_queue_name
,p_number_of_executors =>
l_parallel_level
,p_procedure_to_run =>
l_name_of_proc
,p_procedure_on_success
=> l_name_of_proc_ok
,p_procedure_on_error =>
l_name_of_proc_err
,p_param_on_error_input
=> 3
);
dbms_output.put_line('open
sucessfull');
commit;
for
l_message_id in
1..4500
loop
l_subject :=
'subject '||to_char(l_message_id);
l_text :=
'xpp example '||to_char(l_message_id)||'
';
l_message :=
&&xpp_owner..type_test02(
l_message_id
,l_subject
,l_text
,l_dollar_value
);
-- The name of
package to be executed is derivated from name of
queue by adding prefix "xppi_"
-- The name of
procedure to be called is always "process"
-- Directly after the
first call is created parallel processing of requets
in the queue
&&xpp_owner..xppi_test02.process(
p_queue_name
=> l_queue_name
,p_message =>
l_message
);
commit;
end
loop;
dbms_output.put_line('end
loop');
-- While running this
procedure is processed all requests
-- It finish after
the end of processing XPP
&&xpp_owner..xpp.event_wait_processing_end(
l_queue_name);
dbms_output.put_line('waiting
finished sucessfull.');
commit;
-- This procedure is oposite of event_open if stop
processing
-- and remove
configuration of number of required parallel processes.
-- It also removes
configuration of names to be executed while processing
&&xpp_owner..xpp.event_close(l_queue_name);
dbms_output.put_line('close
sucessfull');
commit;
end;
/
The test of original sequential processing is taking about 90 minutes.
The test of XPP processing is taking
about 30 minutes.
For more details check http://1stsw.com/xpp_documentation/