суббота, 31 мая 2014 г.

Коммуникации между сеансами в СУБД Oracle

В любой среде, где выполняются параллельные процессы, актуальны вопросы совместного использования общих ресурсов и обмена сообщениями между процессами. Одновременно открытые сеансы СУБД Oracle - именно такие параллельные процессы. Я уже рассматривал конкурентный доступ к общим ресурсам с помощью пакета DBMS_LOCK, а теперь поэкспериментирую со средствами обмена сигналами и данными между сеансами СУБД Oracle 11gR2 - пакетами DBMS_ALERT и DBMS_PIPE.

Пакет DBMS_ALERT поддерживает отправку и получение асинхронных уведомлений о событиях (alerts). Это могут быть уведомления об изменении данных в БД, отправленные триггером, или об окончании выполнения некоторой процедуры. Приложение в отдельном сеансе ожидает уведомления, на которые подписалось, и обрабатывает их тем или иным образом, например, отражая наступившие события в пользовательском интерфейсе или выполняя операции с данными, зависящие от наступления события.

Вот основные свойства уведомлений DBMS_ALERT, почерпнутые мной из официальной документации:

  • Привязка к транзакциям. Это значит, что ожидающий сеанс получит уведомление только после завершения транзакции в уведомляющем сеансе. А если транзакция отменена, то уведомление не будет отправлено.
  • Уведомления различаются именами. Многие сеансы могут отправлять и многие сеансы могут ожидать уведомления с одним и тем же именем.
  • Сеанс может подписаться на одно или более уведомлений и ожидать одно конкретное (DBMS_ALERT.WAITONE) или любое из тех, на которые он подписан (DBMS_ALERT.WAITANY).
  • Сеанс, ожидающий уведомление, не может делать что-то еще в время ожидания.
  • Процедуры DBMS_ALERT.WAITONE и DBMS_ALERT.WAITANY имеют опциональный параметр timeout для указания допустимого времени ожидания. Значение 0 означает проверку наличия уведомления и немедленный возврат. Значение по умолчанию задает максимальное время ожидания уведомления, 1000 дней.
  • Вместе с уведомлением может быть послано опциональное сообщение.
  • Если одно и то же уведомление отправляется чаще, чем принимается, то прежде отправленные уведомления теряются. Ожидающий сеанс получит последнее по времени.

Рассмотрим по шагам отправку и получение уведомления с помощью DBMS_ALERT, открыв два сеанса. В первом сеансе запустим следующий блок PL/SQL. В блоке выполняется подписка на уведомление myalert (строка 5), ожидание его до 1000 дней (строка 6), вывод сообщения о получении уведомления (строка 7) и удаление подписки на уведомление (строка 8):

SQL> set serveroutput on
SQL> DECLARE
  2      l_message VARCHAR2(4000);
  3      l_status PLS_INTEGER := 0;
  4  BEGIN
  5      dbms_alert.register('myalert');
  6      dbms_alert.waitone('myalert', l_message, l_status);
  7      dbms_output.put_line(SYSTIMESTAMP || ' : myalert : ' || l_status || ' : ' || l_message);
  8      dbms_alert.remove('myalert');
  9  END;
 10  /

Во втором сеансе отправим уведомление myalert и вернемся к первому сеансу, чтобы увидеть результат.

SQL> BEGIN
  2      dbms_alert.signal('myalert', 'Привет мир!');
  3  END;
  4  /
  
PL/SQL procedure successfully completed

SQL> -- блок PL/SQL в первом сеансе продолжает ждать!

SQL> COMMIT;
Commit complete

SQL> -- только теперь уведомление myalert отправлено

15-MAY-14 07.54.12.761837000 PM +11:00 : myalert : 0 : Привет мир!

PL/SQL procedure successfully completed

Итак,

  • один сеанс посылает уведомления при помощи DBMS_ALERT.SIGNAL и COMMIT.
  • другой сеанс
    1. подписывается на уведомления при помощи DBMS_ALERT.REGISTER,
    2. ожидает уведомления при помощи DBMS_ALERT.WAITONE (или WAITANY) и обрабатывает их,
    3. удаляет подписку на уведомления, когда в них больше нет необходимости.

Попробую отправлять разные уведомления из нескольких параллельных сеансов и получать эти уведомления в другом сеансе.

Для этого создам процедуру signaller, которая будет посылать 10 уведомлений bang или boom, выбирая из двух случайным образом (строка 8 ниже). Отправив уведомление, процедура спит случайное число секунд между 1 и 7, имитируя занятость, и затем отправляет следующее уведомление. Для создания процедуры текущий пользователь должен явно (не через роль) получить привилегии EXECUTE на пакеты SYS.DBMS_ALERT и SYS.DBMS_LOCK.

SQL> CREATE OR REPLACE PROCEDURE signaller
  2  AS
  3      l_rand PLS_INTEGER;
  4      l_alert VARCHAR2(30);
  5  BEGIN
  6      FOR i IN 1..10 LOOP
  7          l_rand := dbms_random.value(1, 7);
  8          l_alert := CASE MOD(l_rand, 2) WHEN 0 THEN 'bang' ELSE 'boom' END;
  9          dbms_alert.signal(l_alert, i || '-' || userenv('SESSIONID'));
 10          COMMIT; -- to actually send the alert
 11          dbms_lock.sleep(l_rand);
 12      END LOOP;
 13  END signaller;
 14  /
 
Procedure created
 
SQL> show errors
No errors

Для получения уведомлений bang и boom создам процедуру consumer с параметром p_sleep - числом секунд между вызовами DBMS_ALERT.WAITANY. На это время consumer будет делать паузы между ожиданиями уведомлений, что приведет к потере некоторых уведомлений, если они отправляются достаточно часто. По умолчанию p_sleep равен 0, что практически устраняет паузы между ожиданиями, и уведомления не должны теряться.


SQL> CREATE OR REPLACE PROCEDURE consumer(p_sleep PLS_INTEGER DEFAULT 0)
  2  AS
  3      l_name VARCHAR2(30);
  4      l_message VARCHAR2(4000);
  5      l_status PLS_INTEGER := 0;
  6  BEGIN
  7      dbms_alert.register('bang');
  8      dbms_alert.register('boom');
  9      dbms_output.put_line('Current session id: ' || userenv('SESSIONID'));
 10      WHILE l_status = 0 LOOP
 11          dbms_alert.waitany(l_name, l_message, l_status, 10);
 12          dbms_output.put_line(SYSTIMESTAMP || ' : ' || l_status || ' : ' || l_name || ' : ' || l_message);
 13          dbms_lock.sleep(p_sleep);
 14      END LOOP;
 15      dbms_alert.removeall;
 16  END;
 17  /
 
Procedure created
 
SQL> show errors
No errors

Каждый раз процедура ожидает уведомления не более 10 секунд - см. значение параметра timeout при вызове DBMS_ALERT.WAITANY в строке 11.

Теперь, с помощью DBMS_SCHEDULER, я запущу процедуру signaller параллельно в двух сеансах и процедуру consumer в текущем сеансе:

BEGIN
    -- запустить генераторы уведомлений в двух сеансах
    dbms_scheduler.create_job(
        job_name => 'signaller_1',
        job_type => 'STORED_PROCEDURE',
        job_action => 'signaller',
        enabled => TRUE);
    dbms_scheduler.create_job(
        job_name => 'signaller_2',
        job_type => 'STORED_PROCEDURE',
        job_action => 'signaller',
        enabled => TRUE);
    -- ждать и обрабатывать уведомления в текущем сеансе
    consumer;
END;
/

Current session id: 531136
15-MAY-14 08.32.22.968000000 PM +11:00 : 0 : BOOM : 1-531156
15-MAY-14 08.32.23.198000000 PM +11:00 : 0 : BANG : 1-531157
15-MAY-14 08.32.25.199000000 PM +11:00 : 0 : BOOM : 2-531157
15-MAY-14 08.32.27.969000000 PM +11:00 : 0 : BOOM : 2-531156
15-MAY-14 08.32.28.982000000 PM +11:00 : 0 : BANG : 3-531156
15-MAY-14 08.32.30.201000000 PM +11:00 : 0 : BOOM : 3-531157
15-MAY-14 08.32.32.983000000 PM +11:00 : 0 : BOOM : 4-531156
15-MAY-14 08.32.33.201000000 PM +11:00 : 0 : BOOM : 4-531157
15-MAY-14 08.32.33.984000000 PM +11:00 : 0 : BOOM : 5-531156
15-MAY-14 08.32.36.203000000 PM +11:00 : 0 : BOOM : 5-531157
15-MAY-14 08.32.38.985000000 PM +11:00 : 0 : BOOM : 6-531156
15-MAY-14 08.32.41.203000000 PM +11:00 : 0 : BANG : 6-531157
15-MAY-14 08.32.45.997000000 PM +11:00 : 0 : BOOM : 7-531156
15-MAY-14 08.32.46.998000000 PM +11:00 : 0 : BANG : 8-531156
15-MAY-14 08.32.47.204000000 PM +11:00 : 0 : BANG : 7-531157
15-MAY-14 08.32.49.205000000 PM +11:00 : 0 : BANG : 8-531157
15-MAY-14 08.32.52.999000000 PM +11:00 : 0 : BOOM : 9-531156
15-MAY-14 08.32.55.206000000 PM +11:00 : 0 : BANG : 9-531157
15-MAY-14 08.32.58.000000000 PM +11:00 : 0 : BOOM : 10-531156
15-MAY-14 08.32.59.207000000 PM +11:00 : 0 : BOOM : 10-531157
15-MAY-14 08.33.09.208000000 PM +11:00 : 1 :  : 10-531157
 
PL/SQL procedure successfully completed

Как видим, в текущем сеансе приняты все отправленные уведомления. Повторим эксперимент, введя паузу в 10 секунд между ожиданиями уведомлений:

BEGIN
    -- запустить генераторы уведомлений в двух сеансах
    dbms_scheduler.create_job(
        job_name => 'signaller_1',
        job_type => 'STORED_PROCEDURE',
        job_action => 'signaller',
        enabled => TRUE);
    dbms_scheduler.create_job(
        job_name => 'signaller_2',
        job_type => 'STORED_PROCEDURE',
        job_action => 'signaller',
        enabled => TRUE);
    -- ждать и обрабатывать уведомления в текущем сеансе
    consumer(10);
END;
/
 
Current session id: 531136
15-MAY-14 09.40.24.878000000 PM +11:00 : 0 : BANG : 1-531179
15-MAY-14 09.40.34.890000000 PM +11:00 : 0 : BOOM : 3-531179
15-MAY-14 09.40.44.902000000 PM +11:00 : 0 : BANG : 6-531179
15-MAY-14 09.40.54.914000000 PM +11:00 : 0 : BANG : 9-531179
15-MAY-14 09.41.04.925000000 PM +11:00 : 0 : BANG : 10-531180
15-MAY-14 09.41.14.937000000 PM +11:00 : 0 : BOOM : 10-531179
15-MAY-14 09.41.34.948000000 PM +11:00 : 1 :  : 10-531179
 
PL/SQL procedure successfully completed

На этот раз часть уведомлений была потеряна, чего и следовало ожидать.

В официальной документации по СУБД Oracle 11gR2 можно подробно познакомиться со всеми процедурами и функциями DBMS_ALERT. А я перейду к экспериментам с пакетом DBMS_PIPE, удалив ненужные теперь процедуры:

SQL> DROP PROCEDURE signaller;
Procedure dropped
 
SQL> DROP PROCEDURE consumer;
Procedure dropped

Пакет DBMS_PIPE позволяет двум или более сеансам пересылать друг другу данные по именованным каналам (pipes). Вот основные сведения о DBMS_PIPE:

  • Запись и чтение данных из каналов не зависит от транзакций в коммуницирующих сеансах.
  • Бывают public и private каналы. Публичные каналы доступны сеансам всех пользователей, имеющих привилегию EXECUTE для DBMS_PIPE. Частные каналы доступны только сеансам того же пользователя, который создал канал.
  • Публичные каналы могут быть явными (explicit) и неявными (implicit). Частные каналы всегда явные.
  • Неявный публичный канал открывается автоматически при первой отправке в него данных и автоматически удаляется, когда из него прочитаны все данные.
  • Явный канал создается при помощи DBMS_PIPE.CREATE_PIPE, параметр private указывает, создается ли публиччный или частный канал, и удаляется при помощи DBMS_PIPE.REMOVE_PIPE.
  • Многие сеансы могут одновременно писать в канал, указывая его имя. Как только данные из канала прочитаны одним из сеансов, данные удаляются из канала и более недоступны для чтения.
  • Сообщения для отправки в канал готовятся при помощи DBMS_PIPE.PACK_MESSAGE, после чего DBMS_PIPE.SEND_MESSAGE посылает все подготовленные сообщения в канал.
  • Данные читаются из канала при помощи DBMS_PIPE.RECEIVE_MESSAGE, после чего DBMS_PIPE.UNPACK_MESSAGE извлекает отдельные сообщения.
  • Деcкрипторы каналов и данные, записанные в канал, размещаются в SGA. А текущие открытые каналы отражаются во вью v$db_pipes.

Следующий PL/SQL блок открывает три канала - частный явный, публичный явный и публичный неявный:

SQL> set serveroutput on
SQL> DECLARE
  2      l_status PLS_INTEGER;
  3      l_message VARCHAR2(50) := 'Привет мир!';
  4  BEGIN
  5      l_status := dbms_pipe.create_pipe(pipename => 'my_private_pipe');
  6      dbms_output.put_line('my_private_pipe : ' || l_status);
  7  
  8      l_status := dbms_pipe.create_pipe(pipename => 'my_public_pipe', private => FALSE);
  9      dbms_output.put_line(' my_public_pipe : ' || l_status);
 10  
 11      dbms_pipe.pack_message(l_message);
 12      l_status := dbms_pipe.send_message(pipename => 'implicit_hello');
 13      dbms_output.put_line(' implicit_hello : ' || l_status);
 14  END;
 15  /
 
my_private_pipe : 0
 my_public_pipe : 0
 implicit_hello : 0
 
PL/SQL procedure successfully completed

SQL> SELECT * FROM v$db_pipes;

   OWNERID NAME                           TYPE     PIPE_SIZE
---------- ------------------------------ ------- ----------
           MY_PUBLIC_PIPE                 PUBLIC        4664
           IMPLICIT_HELLO                 PUBLIC        4664
        95 MY_PRIVATE_PIPE                PRIVATE       4664

Итак, каналы были созданы. Теперь удалю созданные каналы, воспользовавшись DBMS_PIPE.REMOVE_PIPE для явных каналов и прочитав данные из неявного:

SQL> DECLARE
  2      l_status PLS_INTEGER;
  3      l_message VARCHAR2(32767);
  4  BEGIN
  5      l_status := dbms_pipe.remove_pipe(pipename => 'my_private_pipe');
  6      dbms_output.put_line('my_private_pipe : ' || l_status);
  7  
  8      l_status := dbms_pipe.remove_pipe(pipename => 'my_public_pipe');
  9      dbms_output.put_line(' my_public_pipe : ' || l_status);
 10  
 11      l_status := dbms_pipe.receive_message(pipename => 'implicit_hello');
 12      dbms_output.put_line(' implicit_hello : ' || l_status);
 13      dbms_pipe.unpack_message(l_message);
 14      dbms_output.put_line(l_message);
 15   END;
 16  /

my_private_pipe : 0
 my_public_pipe : 0
 implicit_hello : 0
Привет мир!

PL/SQL procedure successfully completed

SQL> SELECT * FROM v$db_pipes;
   OWNERID NAME                           TYPE     PIPE_SIZE
---------- ------------------------------ ------- ----------
           MY_PUBLIC_PIPE                 PUBLIC           0
           IMPLICIT_HELLO                 PUBLIC        4664
           MY_PRIVATE_PIPE                PUBLIC           0

Как видим, после удаления каналы остались во вью v$db_pipes. Однако, вызов DBMS_PIPE.REMOVE_PIPE сбросил в 0 размеры каналов и изменил тип канала my_private_pipe с PRIVATE на PUBLIC. Не совсем то, чего можно было ожидать! При этом вызовы функции DBMS_PIPE.REMOVE_PIPE вернули статус 0, следовательно, выполнились без ошибок. В утешение остается заметить, что вью v$db_pipes не упоминается в документации по пакету DBMS_PIPE. И нет необходимости в него смотреть.

Теперь продемонстрирую вывод PL/SQL процедурой отладочных сообщений в канал и чтение этих сообщений в другом сеансе. Для этой цели буду использовать публичный неявный канал. Необходимые процедуры помещу в пакет DBG. Для создания пакета DBG текущий пользователь должен явно (не через роль) получить привилегию EXECUTE на пакет SYS.DBMS_PIPE.

SQL> CREATE OR REPLACE PACKAGE dbg AS
  2      PROCEDURE OPEN(p_name VARCHAR2);
  3      PROCEDURE WRITE(p_message VARCHAR2);
  4      PROCEDURE READ;
  5  END dbg;
  6  /

Package created

SQL> CREATE OR REPLACE PACKAGE BODY dbg AS
  2  
  3      PIPENAME VARCHAR2(30);
  4  
  5      PROCEDURE OPEN(p_name VARCHAR2) IS
  6      BEGIN
  7          PIPENAME := p_name;
  8      END OPEN;
  9  
 10      PROCEDURE WRITE(p_message VARCHAR2) IS
 11          l_status  NUMBER;
 12      BEGIN
 13          dbms_pipe.pack_message(SYSTIMESTAMP || ' : ' || p_message);
 14          l_status := dbms_pipe.send_message(PIPENAME, 0);
 15          IF l_status != 0 THEN
 16              raise_application_error(-20001, 'Error sending message: ' || l_status);
 17          END IF;
 18      END WRITE;
 19  
 20      PROCEDURE READ IS
 21          l_message VARCHAR2(8192);
 22          l_status NUMBER;
 23      BEGIN
 24          dbms_output.enable(1000000);
 25          dbms_output.put_line('-- debug messages ------------------------------------');
 26          LOOP
 27              l_status := dbms_pipe.receive_message(PIPENAME, 0);
 28              EXIT WHEN l_status != 0;
 29              LOOP
 30                  l_status := dbms_pipe.next_item_type;
 31                  EXIT WHEN l_status = 0;
 32                  IF l_status = 9 THEN
 33                      dbms_pipe.unpack_message(l_message);
 34                      dbms_output.put_line(l_message);
 35                  ELSE
 36                      dbms_output.put_line('!!! Unsupported message type ' || l_status);
 37                      BEGIN
 38                          dbms_pipe.unpack_message(l_message);
 39                          dbms_output.put_line(l_message);
 40                      EXCEPTION
 41                      WHEN OTHERS THEN
 42                          NULL;
 43                      END;
 44                  END IF;
 45              END LOOP;
 46          END LOOP;
 47          dbms_output.put_line ('-- end of debug messages ----------------------------');
 48      END READ;
 49  
 50  END dbg;
 51  /
 
Package body created

SQL> show error
No errors

Воспользуюсь созданным пакетом, чтобы писать в канал debug_pipe отладочные сообщения:

SQL> BEGIN
  2      dbg.open('debug_pipe');
  3      dbg.write('one!');
  4      dbms_lock.sleep(5);
  5      dbg.write('TWO!!!');
  6      dbms_lock.sleep(5);
  7      dbg.write('B-A-N-G!!!!!');
  8  END;
  9  /
PL/SQL procedure successfully completed

Открыв другой сеанс, прочитаю сообщения из канала:

SQL> set serveroutput on
SQL> BEGIN
  2      dbg.open('debug_pipe');
  3      dbg.read;
  4  END;
  5  /

-- debug messages ------------------------------------
31-MAY-14 01.32.57.038951000 PM +11:00 : one!
31-MAY-14 01.33.02.039387000 PM +11:00 : TWO!!!
31-MAY-14 01.33.07.040638000 PM +11:00 : B-A-N-G!!!!!
-- end of debug messages ----------------------------

PL/SQL procedure successfully completed

Работает!

Следующий эксперимент состоит в том, чтобы писать в канал сообщения из более чем одного сеанса, а затем прочитать их:

SQL> BEGIN
  2      FOR i IN 1..3 LOOP
  3          dbms_scheduler.create_job(
  4              job_name => 'qwerty' || i,
  5              job_type => 'PLSQL_BLOCK',
  6              job_action => q'[BEGIN dbg.open('debug_pipe'); dbg.write('ready?'); dbms_lock.sleep(5); dbg.write('B-A-N-G!!!!!'); END;]',
  7              ENABLED => TRUE
  8          );
  9      END LOOP;
 10  END;
 11  /

PL/SQL procedure successfully completed

Выждав больше 5 секунд, переключаюсь в другой сеанс и читаю сообщения из канала:

SQL> BEGIN
  2      dbg.open('debug_pipe');
  3      dbg.read;
  4  END;
  5  /

-- debug messages ------------------------------------
31-MAY-14 01.38.34.552739000 PM +11:00 : ready?
31-MAY-14 01.38.34.662290000 PM +11:00 : ready?
31-MAY-14 01.38.34.672965000 PM +11:00 : ready?
31-MAY-14 01.38.39.554072000 PM +11:00 : B-A-N-G!!!!!
31-MAY-14 01.38.39.663011000 PM +11:00 : B-A-N-G!!!!!
31-MAY-14 01.38.39.673978000 PM +11:00 : B-A-N-G!!!!!
-- end of debug messages ----------------------------

PL/SQL procedure successfully completed

Итак, сообщения, посланные в канал debug_pipe из трех параллельных сеансов, успешно прочитаны в текущем сеансе. Пакет DBG в самом деле удобное средство для сбора данных о работе выполняющегося приложения. Когда PL/SQL код, выводящий сообщения, запускается приложением, взаимодействующим с пользователем, отладочный канал позволяет разработчику видеть, что происходит.

Завершая разговор о DBMS_PIPE, замечу, что не все мои эксперименты с этим пакетом прошли гладко и привели к ожидаемому результату. Кто заинтересовался, может подробнее познакомиться с процедурами и функциями DBMS_PIPE по официально документации по СУБД Oracle и продолжить эксперименты.

Комментариев нет:

Отправить комментарий