11 июля 2015

Решения задачи прореживания котировок для FOREX и банков через MATERIALIZED VIEW REFRESH FAST ON COMMIT

Передо мной была поставлена такая задача:

1) В таблицу ежеминутно добавляются котировки FOREX
2) Строка котировки состоит из таких значений: время (в секундах, относительно точки отсчёта, в примере далее -- смещение относительно 1 января 2014 года), курс, который существовал на момент начала интервала, на момент конца интервала, минимальный достигнутый курс в интервале, максимальный, и объём операций.
Строка котировок -- типична для рынка FOREX и для фондового рынка
3) Необходимо из минутных строк сформировать пятиминутные, 15-минутные, получасовые, и часовые строки.
Алгоритм расчёта: OPEN -- first open, CLOSE -- last close, min -- MIN(), max -- MAX (), объём -- SUM ()
4) Сделать максимально эффективно, удобно, и _надёжно_


На момент постановки задачи мне было предложено три варианта решения:
1. Процедура, перебирающая построчно (отбросил сразу, как низкопроизводительный вариант)
2. MERGE (отбросил, потому что может выполняться UPDATE, а для транзакционных таблиц нужно установить PCTFREE 0, и могут появляться сцепленные и мигрировавшие строки)
3. INSERT ALL (отбросил, потому что громоздко)

В итоге пока я сделал четвёртый вариант, с подходом "одним оператором", как рекомендовал Кайт.
Подход "Одним оператором" не означает, что оператор будет только один. Это означает, что не будет циклов, перебирающих строки по одной.
В итоге операторов получилось 3 (выбор таблицы осуществляется с помощью динамического SQL).

Эти три оператора:
1) Расчёт верного прореживания во временную таблицу
2) Удаление строк, которые есть в фактической таблице, но нет во временной
3) Добавление строк, которые есть во временной таблице, но нет в фактической

Но поразмыслив, сделал пятый вариант -- на MATERIALIZED VIEW REFRESH FAST ON COMMIT.

Вот путь решения

Сама по себе задача прореживания решается таким запросом

select trunc (ASECONDS / 300) * 300 as ASECONDS
     , avg (AOPEN) keep (dense_rank first order by ASECONDS) as AOPEN
     , avg (ACLOSE) keep (dense_rank last order by ASECONDS) as ACLOSE
     -- для FIRST и LAST агрегатная функция может быть любая
     , min (AMIN) as AMIN
     , max (AMAX) as AMAX
     , sum (AVOLUME) as AVOLUME
from V5_RATES_1MIN
group by trunc (ASECONDS / 300) * 300


Здесь прореживается из минутных интервалов до пятиминутных (300 сек)

Здесь и далее в качестве даты используется не поле типа DATE, а число, указывающее смещение в секундах от 1 января 2014. Это везде поле ASECONDS

Для такой трансляции я применяю такие функции

create or replace function DATE2SECONDS (parm date) return number deterministic is
begin return round ((parm - to_date ('01.01.2014', 'DD.MM.YYYY')) * 86400); end;
/
create or replace function SECONDS2DATE (parm number) return date deterministic is
begin return to_date ('01.01.2014', 'DD.MM.YYYY') + parm / 86400; end;
/

Было бы отлично, если бы из этого запроса можно было сделать MATERIALIZED VIEW с REFRESH FAST желательно, с опцией ON COMMIT

К сожалению, это невозможно. Этому препятствуют функции FIRST и LAST

Но решить эту задачу всё таки возможно иным путём.

Для этого запрос придётся переписать в несколько шагов
with
T1 as (select trunc (ASECONDS / 300) * 300 as ASECONDS
            , min (ASECONDS) as ASECONDS_OPEN
            , max (ASECONDS) as ASECONDS_CLOSE
            , min (AMIN) as AMIN
            , max (AMAX) as AMAX
            , sum (AVOLUME) as AVOLUME
       from V5_RATES_1MIN
       group by trunc (ASECONDS / 300) * 300)
select a.ASECONDS
     , b.AOPEN, c.ACLOSE -- из связанных таблиц
     , a.AMIN, a.AMAX
     , a.AVOLUME
from V5_RATES_5MIN_T1 a
   , V5_RATES_1MIN b, V5_RATES_1MIN c -- связанные таблицы
where a.ASECONDS_OPEN = b.ASECONDS and a.ASECONDS_CLOSE = c.ASECONDS


Напрямую из этого запроса тоже создать MATVIEW REFRESH FAST тоже нельзя, но этот запрос можно разбить на 2 части:

1) Верхняя часть, вычисляющая минимальное и максимальное время, из которого мы потом будем брать значения OPEN и CLOSE (на картинке далее эти MATVIEW называются *_PRELIMINARY). Тип MATVIEW -- "с предпосчитанными агрегатами"

2) Собственно, вычисление значений OPEN и CLOSE. Тип MATVIEW -- "join matview"

А вот эти 2 части уже можно превратить в MATERIALIZED VIEW REFRESH FAST

И соединить их каскадно.


Теперь вычисление прореженных данных может выполняться по такому алгоритму (с минуты до часа)





Стрелки на диаграмме показывают, на основании чего строится каждое MATVIEW, т.е предварительное -- на основании одной таблицы, окончательное -- на основании двух.

Причём на все MAT VIEW можно установить атрибуты REFRESH FAST ON COMMIT (!)

Таким образом, добавив одну котировку в исходную таблицу V5_RATES_1MIN автоматом пересчитаются все котировки до часовой включительно.
Также, если мы загрузили ошибочные данные, их можно удалить из исходной таблицы, загрузить верные, и опять всё пересчитается.

Можно быть уверенными в том, что котировки всех периодов актуальны. Запуск процедур пересчёта через планировщик не требуется.

Но применять на рабочей базе такой алгоритм следует с осторожностью -- затраты на каскадное обновление великоваты. При добавлении одной строки это не заметно, но при добавлении строк за месяц -- затраты значительны.
Также, следует обратить внимание, как созданы уникальные индексы на окончательных таблицах, в противовес предварительным





DROP TABLE ORIG_RATES_1MIN CASCADE CONSTRAINTS;
 
CREATE TABLE ORIG_RATES_1MIN
(
  SNAPSHOT_TIME  DATE               not null,
  AOPEN          CHAR(128 BYTE)     not null,
  ACLOSE         CHAR(128 BYTE)     not null,
  AMIN           CHAR(128 BYTE)     not null,
  AMAX           CHAR(128 BYTE)     not null,
  AVOLUME        CHAR(128 BYTE)     not null
)
TABLESPACE USERS PCTFREE 0;
 
DROP TABLE ORIG_RATES_60MIN CASCADE CONSTRAINTS;
 
CREATE TABLE ORIG_RATES_60MIN
(
  SNAPSHOT_TIME  DATE               not null,
  AOPEN          CHAR(128 BYTE)     not null,
  ACLOSE         CHAR(128 BYTE)     not null,
  AMIN           CHAR(128 BYTE)     not null,
  AMAX           CHAR(128 BYTE)     not null,
  AVOLUME        CHAR(128 BYTE)     not null
)
TABLESPACE USERS PCTFREE 0;
 
drop materialized view        V5_RATES_60MIN;
drop materialized view log on V5_RATES_60MIN_PRELIMINARY;
drop materialized view        V5_RATES_60MIN_PRELIMINARY;
drop materialized view log on V5_RATES_30MIN;
 
drop materialized view        V5_RATES_30MIN;
drop materialized view log on V5_RATES_30MIN_PRELIMINARY;
drop materialized view        V5_RATES_30MIN_PRELIMINARY;
drop materialized view log on V5_RATES_15MIN;
 
drop materialized view        V5_RATES_15MIN;
drop materialized view log on V5_RATES_15MIN_PRELIMINARY;
drop materialized view        V5_RATES_15MIN_PRELIMINARY;
drop materialized view log on V5_RATES_5MIN;
 
drop materialized view        V5_RATES_5MIN;
drop materialized view log on V5_RATES_5MIN_PRELIMINARY;
drop materialized view        V5_RATES_5MIN_PRELIMINARY;
drop materialized view log on V5_RATES_1MIN;
 
drop table V5_RATES_1MIN;
 
 
create or replace function DATE2SECONDS (parm date) return number deterministic s
begin return round ((parm - to_date ('01.01.2014', 'DD.MM.YYYY')) * 86400); end;
/
 
create or replace function SECONDS2DATE (parm number) return date deterministic s
begin return to_date ('01.01.2014', 'DD.MM.YYYY') + parm / 86400; end;
/

create table V5_RATES_1MIN (
    ASECONDS            number     not null,
    AOPEN               number     not null,
    ACLOSE              number     not null,
    AMIN                number     not null,
    AMAX                number     not null,
    AVOLUME             number     not null)
tablespace USERS pctfree 0;
 
create unique index V5_RATES_1MIN_I1 on V5_RATES_1MIN (ASECONDS);
 
-- 5 minutes *******************************************************************
 
create materialized view log on V5_RATES_1MIN
tablespace USERS pctfree 0
with rowid, commit scn, sequence (ASECONDS, AOPEN, ACLOSE, AMIN, AMAX, AVOLUME)
including new values;
 
create materialized view V5_RATES_5MIN_PRELIMINARY
            tablespace USERS pctfree 0 build immediate
using index tablespace USERS pctfree 0
refresh fast on commit with rowid
as
select trunc (ASECONDS / (5 * 60)) * (5 * 60)   as ASECONDS
     , min   (ASECONDS)          as ASECONDS_OPEN
     , max   (ASECONDS)          as ASECONDS_CLOSE
     , min   (AMIN)              as AMIN
     , max   (AMAX)              as AMAX
     , sum   (AVOLUME)           as AVOLUME
     , count (*)                 as COUNT_DUMMY
     , count (AVOLUME)           as COUNT_DUMMY2
from V5_RATES_1MIN
group by trunc (ASECONDS / (5 * 60)) * (5 * 60);
 
create unique index V5_RATES_5MIN_PRELIMINARY_I1 on V5_RATES_5MIN_PRELIMINARY (ASECONDS);
 
create materialized view log on V5_RATES_5MIN_PRELIMINARY
tablespace USERS pctfree 0
with rowid, commit scn, sequence (ASECONDS, ASECONDS_OPEN, ASECONDS_CLOSE)
including new values;
 
create materialized view V5_RATES_5MIN
            tablespace USERS pctfree 0 build immediate
using index tablespace USERS pctfree 0
refresh fast on commit with rowid as
select a.ASECONDS
     , b.AOPEN, c.ACLOSE                           -- из связанных таблиц
     , a.AMIN, a.AMAX
     , a.AVOLUME
     , a.rowid as R1, b.rowid as R2, c.rowid as R3 -- это необходимо для join matview
from V5_RATES_5MIN_PRELIMINARY a, V5_RATES_1MIN b, V5_RATES_1MIN c
where a.ASECONDS_OPEN = b.ASECONDS and a.ASECONDS_CLOSE = c.ASECONDS;

--create unique index V5_RATES_5MIN_I1 on V5_RATES_5MIN (ASECONDS);
alter table V5_RATES_5MIN add constraint V5_RATES_5MIN_I1 unique (ASECONDS) initially deferred;

-- 15 minutes ******************************************************************
 
create materialized view log on V5_RATES_5MIN
tablespace USERS pctfree 0
with rowid, commit scn, sequence (ASECONDS, AOPEN, ACLOSE, AMIN, AMAX, AVOLUME)
including new values;
 
create materialized view V5_RATES_15MIN_PRELIMINARY
            tablespace USERS pctfree 0 build immediate
using index tablespace USERS pctfree 0
refresh fast on commit with rowid
as
select trunc (ASECONDS / (15 * 60)) * (15 * 60) as ASECONDS
     , min   (ASECONDS)          as ASECONDS_OPEN
     , max   (ASECONDS)          as ASECONDS_CLOSE
     , min   (AMIN)              as AMIN
     , max   (AMAX)              as AMAX
     , sum   (AVOLUME)           as AVOLUME
     , count (*)                 as COUNT_DUMMY
     , count (AVOLUME)           as COUNT_DUMMY2
from V5_RATES_5MIN
group by trunc (ASECONDS / (15 * 60)) * (15 * 60);
 
create unique index V5_RATES_15MIN_PRELIMINARY_I1 on V5_RATES_15MIN_PRELIMINARY (ASECONDS);
 
create materialized view log on V5_RATES_15MIN_PRELIMINARY
tablespace USERS pctfree 0
with rowid, commit scn, sequence (ASECONDS, ASECONDS_OPEN, ASECONDS_CLOSE)
including new values;
 
create materialized view V5_RATES_15MIN
            tablespace USERS pctfree 0 build immediate
using index tablespace USERS pctfree 0
refresh fast on commit with rowid as
select a.ASECONDS
     , b.AOPEN, c.ACLOSE                           -- из связанных таблиц
     , a.AMIN, a.AMAX
     , a.AVOLUME
     , a.rowid as R1, b.rowid as R2, c.rowid as R3 -- это необходимо для join matview
from V5_RATES_15MIN_PRELIMINARY a, V5_RATES_5MIN b, V5_RATES_5MIN c
where a.ASECONDS_OPEN = b.ASECONDS and a.ASECONDS_CLOSE = c.ASECONDS;
 
--create unique index V5_RATES_15MIN_I1 on V5_RATES_15MIN (ASECONDS);
alter table V5_RATES_15MIN add constraint V5_RATES_15MIN_I1 unique (ASECONDS) initially deferred;

-- 30 minutes ******************************************************************
 
create materialized view log on V5_RATES_15MIN
tablespace USERS pctfree 0
with rowid, commit scn, sequence (ASECONDS, AOPEN, ACLOSE, AMIN, AMAX, AVOLUME)
including new values;
 
create materialized view V5_RATES_30MIN_PRELIMINARY
            tablespace USERS pctfree 0 build immediate
using index tablespace USERS pctfree 0
refresh fast on commit with rowid
as
select trunc (ASECONDS / (30 * 60)) * (30 * 60) as ASECONDS
     , min   (ASECONDS)          as ASECONDS_OPEN
     , max   (ASECONDS)          as ASECONDS_CLOSE
     , min   (AMIN)              as AMIN
     , max   (AMAX)              as AMAX
     , sum   (AVOLUME)           as AVOLUME
     , count (*)                 as COUNT_DUMMY
     , count (AVOLUME)           as COUNT_DUMMY2
from V5_RATES_15MIN
group by trunc (ASECONDS / (30 * 60)) * (30 * 60);
 
create unique index V5_RATES_30MIN_PRELIMINARY_I1 on V5_RATES_30MIN_PRELIMINARY (ASECONDS);
 
create materialized view log on V5_RATES_30MIN_PRELIMINARY
tablespace USERS pctfree 0
with rowid, commit scn, sequence (ASECONDS, ASECONDS_OPEN, ASECONDS_CLOSE)
including new values;
 
create materialized view V5_RATES_30MIN
            tablespace USERS pctfree 0 build immediate
using index tablespace USERS pctfree 0
refresh fast on commit with rowid as
select a.ASECONDS
     , b.AOPEN, c.ACLOSE                           -- из связанных таблиц
     , a.AMIN, a.AMAX
     , a.AVOLUME
     , a.rowid as R1, b.rowid as R2, c.rowid as R3 -- это необходимо для join matview
from V5_RATES_30MIN_PRELIMINARY a, V5_RATES_15MIN b, V5_RATES_15MIN c
where a.ASECONDS_OPEN = b.ASECONDS and a.ASECONDS_CLOSE = c.ASECONDS;
 
--create unique index V5_RATES_30MIN_I1 on V5_RATES_30MIN (ASECONDS);
alter table V5_RATES_30MIN add constraint V5_RATES_30MIN_I1 unique (ASECONDS) initially deferred;
 
-- 60 minutes ******************************************************************
 
create materialized view log on V5_RATES_30MIN
tablespace USERS pctfree 0
with rowid, commit scn, sequence (ASECONDS, AOPEN, ACLOSE, AMIN, AMAX, AVOLUME)
including new values;
 
create materialized view V5_RATES_60MIN_PRELIMINARY
            tablespace USERS pctfree 0 build immediate
using index tablespace USERS pctfree 0
refresh fast on commit with rowid
as
select trunc (ASECONDS / (60 * 60)) * (60 * 60) as ASECONDS
     , min   (ASECONDS)          as ASECONDS_OPEN
     , max   (ASECONDS)          as ASECONDS_CLOSE
     , min   (AMIN)              as AMIN
     , max   (AMAX)              as AMAX
     , sum   (AVOLUME)           as AVOLUME
     , count (*)                 as COUNT_DUMMY
     , count (AVOLUME)           as COUNT_DUMMY2
from V5_RATES_30MIN
group by trunc (ASECONDS / (60 * 60)) * (60 * 60);
 
create unique index V5_RATES_60MIN_PRELIMINARY_I1 on V5_RATES_60MIN_PRELIMINARY (ASECONDS);
 
create materialized view log on V5_RATES_60MIN_PRELIMINARY
tablespace USERS pctfree 0
with rowid, commit scn, sequence (ASECONDS, ASECONDS_OPEN, ASECONDS_CLOSE)
including new values;
 
create materialized view V5_RATES_60MIN
            tablespace USERS pctfree 0 build immediate
using index tablespace USERS pctfree 0
refresh fast on commit with rowid as
select a.ASECONDS
     , b.AOPEN, c.ACLOSE                           -- из связанных таблиц
     , a.AMIN, a.AMAX
     , a.AVOLUME
     , a.rowid as R1, b.rowid as R2, c.rowid as R3 -- это необходимо для join matview
from V5_RATES_60MIN_PRELIMINARY a, V5_RATES_30MIN b, V5_RATES_30MIN c
where a.ASECONDS_OPEN = b.ASECONDS and a.ASECONDS_CLOSE = c.ASECONDS;
 
--create unique index V5_RATES_60MIN_I1 on V5_RATES_60MIN (ASECONDS);
alter table V5_RATES_60MIN add constraint V5_RATES_60MIN_I1 unique (ASECONDS) initially deferred;