The eXtreme Parallel Processing introduce easy way hot to rewrite current code into parallel processed code.

Let's assume we have current PLSQL code:

declare

   l_text               varchar2(100);

   l_encrypted_raw      raw (2000);             -- stores encrypted binary text

   l_decrypted_raw      raw (2000);             -- stores decrypted binary text

   l_num_key_bytes      number := 256/8;        -- key length 256 bits (32 bytes)

   l_key_bytes_raw      raw (32);               -- stores 256-bit encryption key

   l_encryption_type    pls_integer :=          -- total encryption type

                            dbms_crypto.encrypt_aes256

                          + dbms_crypto.chain_cbc

                          + dbms_crypto.pad_pkcs5;   

begin

  dbms_output.put_line('open sucessfull');

  for l_message_id in 1..10000

   loop

     -- begin processing

      --- get record to be processed

      select to_char(changed, 'yyyy-mon-dd hh24:mi')

      into l_text

      from test

      where id = test_struct.message_id;

 

      --- process record

      l_key_bytes_raw := dbms_crypto.randombytes (l_num_key_bytes);

      for i in 1..10000

      loop

         l_encrypted_raw := dbms_crypto.encrypt

            (

               src => utl_i18n.string_to_raw (l_text,  'AL32UTF8'),

               typ => l_encryption_type,

               key => l_key_bytes_raw

            );

          -- the encrypted value "l_encrypted_raw" can be used here

      

         l_text := to_char(i*i);

         l_decrypted_raw := dbms_crypto.decrypt

            (

               src => l_encrypted_raw,

               typ => l_encryption_type,

               key => l_key_bytes_raw

            );

         l_text := utl_i18n.raw_to_char (l_decrypted_raw, 'AL32UTF8');

      end loop;

     

      --- save result(s)

      update test

      set text = test_struct.text || l_text,

          changed = sysdate

      where id = test_struct.message_id;

     

   -- end processing

  end loop; 

  dbms_output.put_line('end loop');

 

end;

/
And following structure of requests for processing:

prompt create table TEST

create table TEST

(

  id       number                               not null,

  text     varchar2(100 byte),

  changed  date,

  status   varchar2(4000 char),

  primary key (id)

)

tablespace &&SMALL_DATA;

 

comment on table TEST is

'Test table to try XPP processing example'

;

comment on column TEST.id      is 'Identification of row';

comment on column TEST.text    is 'Text array';

comment on column TEST.changed is 'Time of change to track final effect';

comment on column TEST.status  is 'Status of processing';

 

prompt Prepare data for test into TEST table

declare

   id   number;

begin

   for id in 1..4500

   loop

      -- begin processing

      insert into TEST (id, changed)

           values (id, sysdate);

   -- end processing

   end loop;

end;

/


It can be updated for using of the eXtreme Parallel Processing following way:
1) Collect data required all required data for processing and initiate eXtreme Parallel Processing (xpp.initialize)

2) The processing part in the loop should be separated into procedure.

3) Grant the new procedure to eXtreme Parallel Processing package. (We recommend to keep eXtreme Parallel Processing in separate schema under separate user.)

4) Call processing using eXtreme Parallel Processing

Add 1) Call eXtreme Parallel Processing initialization.

Note: This step is creation in XPP all required structures and name of newly defined type is returned in parameter
declare

  p_queue_name      varchar2(32767);

  p_queue_type_body varchar2(32767);

  p_storage_clause  varchar2(32767);

  p_queue_comments  varchar2(32767);

  p_user_to_grant   varchar2(32767);

  p_queue_type_name varchar2(32767); -- out

 

begin

  -- The name of request queue (The name of queue is required for next processing.)

  p_queue_name := 'test02';

  -- In this variable is defined type required for input into procedure to_test

  p_queue_type_body :=

  ' message_id     number(15)

  , subject        varchar2(100)

  , text           varchar2(100)

  , dollar_value   number(4,2)';

  -- This parameter is not necessary, but it can be usefull

  p_storage_clause := 'tablespace &&small_data';

  -- This parameter is not necessary. It's only nice to have

  p_queue_comments := 'the first queue';

  -- In this parameter should be name of schema where is XPP installed

  p_user_to_grant := '&&xpp_user';

 

  &&xpp_owner..xpp.event_initiate (

     p_queue_name,

     p_queue_type_body,

     p_storage_clause,

     p_queue_comments,

     p_user_to_grant, p_queue_type_name );

  dbms_output.put_line('initiate sucessfull new type name: '''||p_queue_type_name||'''.');

 

  commit;

end;

/

Add 2) Result will be following:
create or replace procedure to_test (

   test_struct in &&xpp_owner..type_test02

)

as

   l_text             varchar2(100);

   l_encrypted_raw      raw (2000);             -- stores encrypted binary text

   l_decrypted_raw      raw (2000);             -- stores decrypted binary text

   l_num_key_bytes      number := 256/8;        -- key length 256 bits (32 bytes)

   l_key_bytes_raw      raw (32);               -- stores 256-bit encryption key

   l_encryption_type    pls_integer :=          -- total encryption type

                            dbms_crypto.encrypt_aes256

                          + dbms_crypto.chain_cbc

                          + dbms_crypto.pad_pkcs5;   

begin

 

      -- begin processing

      --- get record to be processed

      SELECT to_char(changed, 'YYYY-MON-DD HH24:MI')

      INTO l_text

      FROM test

      WHERE id = test_struct.message_id;

 

      --- process record

      l_key_bytes_raw := dbms_crypto.randombytes (l_num_key_bytes);

      for i in 1..10000

      loop

         l_encrypted_raw := dbms_crypto.encrypt

            (

               src => utl_i18n.string_to_raw (l_text,  'AL32UTF8'),

               typ => l_encryption_type,

               key => l_key_bytes_raw

            );

          -- the encrypted value "l_encrypted_raw" can be used here

      

         l_text := to_char(i*i);

         l_decrypted_raw := dbms_crypto.decrypt

            (

               src => l_encrypted_raw,

               typ => l_encryption_type,

               key => l_key_bytes_raw

            );

         l_text := utl_i18n.raw_to_char (l_decrypted_raw, 'AL32UTF8');

      end loop;

     

      --- save result(s)

      update test

      set text = test_struct.text || l_text,

          changed = sysdate

      where id = test_struct.message_id;

      -- end processing

 

      commit;

end to_test;

/


Add 3) Grant of the procedure to XPP.
grant execute on to_execute to &&xpp_owner;

Add 4) Call of processing in XPP.
declare

    l_queue_name        varchar2(32767);

    l_parallel_level    number;

    l_name_of_proc      varchar2(32767);

    l_message           &&xpp_owner..type_test02;

    l_message_id        number(5);

    l_subject           varchar2(100);

    l_text              varchar2(100);

    l_dollar_value      number(4,2);  

begin

  l_queue_name     := 'test02'; -- The name of queue of requests initiate in previos step

  l_parallel_level := 3;        -- The number of required parallel executions

  l_name_of_proc   := '&&xpp_user..to_test'; -- The name of procedure including current schema

 

  dbms_output.put_line('test to execute parallel.');

 

  -- This proceedure is creation ad hock configuration for processing 

  &&xpp_owner..xpp.event_open (

     p_queue_name           => l_queue_name

    ,p_number_of_executors  => l_parallel_level

    ,p_procedure_to_run     => l_name_of_proc

  );

 

  dbms_output.put_line('open sucessfull');

  commit;

 

  for l_message_id in 1..4500

  loop

       l_subject        := 'subject '||to_char(l_message_id);

       l_text           := 'xpp example '||to_char(l_message_id)||' ';

      

       l_message        := &&xpp_owner..type_test02(

                              l_message_id

                             ,l_subject

                             ,l_text

                             ,l_dollar_value

                           );

       -- The name of package to be executed is derivated from name of queue by adding prefix "xppi_"

       -- The name of procedure to be called is always "process" 

       -- Directly after the first call is created parallel processing of requets in the queue

       &&xpp_owner..xppi_test02.process(

         p_queue_name => l_queue_name

        ,p_message => l_message

       );

 

       commit;

  end loop; 

  dbms_output.put_line('end loop');

 

  -- While running this procedure is processed all requests

  -- It finish after the end of processing XPP

  &&xpp_owner..xpp.event_wait_processing_end( l_queue_name);

  dbms_output.put_line('waiting finished sucessfull.');

  commit;

 

  -- This procedure is oposite of event_open if stop processing

  -- and remove configuration of number of required parallel processes.

  -- It also removes configuration of names to be executed while processing 

  &&xpp_owner..xpp.event_close(l_queue_name);

  dbms_output.put_line('close sucessfull');

  commit;

 

end;

/

Note: Sometime is required to handle different way successfully processed and requests where raise while processing an error.

create or replace procedure on_success (

   test_struct in &&xpp_owner..type_test02

)

as

   pragma autonomous_transaction;

begin

 

      --- save result(s)

      update test

      set status = 'done',

          changed = sysdate

      where id = test_struct.message_id;

      -- end processing

     

      commit;

end on_success;

/

 

create or replace procedure on_error (

   test_struct in &&xpp_owner..type_test02

  ,p_sqlcode   in varchar2

  ,p_sqlterm   in varchar2

  ,p_backtrace in varchar2

)

as

   pragma autonomous_transaction;

begin

      --- save result(s)

      update test

      set status = substr('err: ora-'||p_sqlcode||' p_sqlterm:'||p_sqlterm||' p_backtrace: '||p_backtrace, 1, 4000),

          changed = sysdate

      where id = test_struct.message_id;

      -- end processing

     

      commit;

end on_error;

/

 

 

The execution is changed only while calling event_open following way:

 


declare

    l_queue_name        varchar2(32767);

    l_parallel_level    number;

    l_name_of_proc      varchar2(32767);

    l_name_of_proc_ok   varchar2(32767);

    l_name_of_proc_err  varchar2(32767);

    l_message           &&xpp_owner..type_test02;

    l_message_id        number(5);

    l_subject           varchar2(100);

    l_text              varchar2(100);

    l_dollar_value      number(4,2);  

begin

  l_queue_name     := 'test02'; -- The name of queue of requests initiate in previos step

  l_parallel_level := 3;        -- The number of required parallel executions

  l_name_of_proc     := '&&xpp_user..to_test'; -- The name of procedure including current schema

  l_name_of_proc_ok  := '&&xpp_user..on_success'; -- The name of procedure to be called after successful processing

  l_name_of_proc_err := '&&xpp_user..on_error'; -- The name of procedure to be called when processing raise exception

 

  dbms_output.put_line('test to execute parallel.');

 

  -- This proceedure is creation ad hock configuration for processing 

  &&xpp_owner..xpp.event_open (

     p_queue_name           => l_queue_name

    ,p_number_of_executors  => l_parallel_level

    ,p_procedure_to_run     => l_name_of_proc

    ,p_procedure_on_success => l_name_of_proc_ok

    ,p_procedure_on_error   => l_name_of_proc_err

    ,p_param_on_error_input => 3

  );

 

  dbms_output.put_line('open sucessfull');

  commit;

 

  for l_message_id in 1..4500

  loop

       l_subject        := 'subject '||to_char(l_message_id);

       l_text           := 'xpp example '||to_char(l_message_id)||' ';

      

       l_message        := &&xpp_owner..type_test02(

                              l_message_id

                             ,l_subject

                             ,l_text

                             ,l_dollar_value

                           );

       -- The name of package to be executed is derivated from name of queue by adding prefix "xppi_"

       -- The name of procedure to be called is always "process" 

       -- Directly after the first call is created parallel processing of requets in the queue

       &&xpp_owner..xppi_test02.process(

         p_queue_name => l_queue_name

        ,p_message => l_message

       );

 

       commit;

  end loop; 

  dbms_output.put_line('end loop');

 

  -- While running this procedure is processed all requests

  -- It finish after the end of processing XPP

  &&xpp_owner..xpp.event_wait_processing_end( l_queue_name);

  dbms_output.put_line('waiting finished sucessfull.');

  commit;

 

  -- This procedure is oposite of event_open if stop processing

  -- and remove configuration of number of required parallel processes.

  -- It also removes configuration of names to be executed while processing 

  &&xpp_owner..xpp.event_close(l_queue_name);

  dbms_output.put_line('close sucessfull');

  commit;

 

end;

/

The test of original sequential processing is taking about 90 minutes.

The test of XPP processing is taking about 30 minutes.

For more details check http://1stsw.com/xpp_documentation/