47.6. ÐодÑли вÑвода логиÑеÑкого декодиÑÐ¾Ð²Ð°Ð½Ð¸Ñ #
ÐÑÐ¸Ð¼ÐµÑ Ð¼Ð¾Ð´ÑÐ»Ñ Ð²Ñвода можно найÑи в подкаÑалоге contrib/test_decoding в деÑеве иÑÑ
одного кода Postgres Pro.
47.6.1. ФÑнкÑÐ¸Ñ Ð¸Ð½Ð¸ÑиализаÑии #
ÐодÑÐ»Ñ Ð²Ñвода загÑÑжаеÑÑÑ Ð² ÑезÑлÑÑаÑе динамиÑеÑкой загÑÑзки ÑазделÑемой библиоÑеки (пÑи ÑÑом в каÑеÑÑве имени библиоÑеки задаÑÑÑÑ Ð¸Ð¼Ñ Ð¼Ð¾Ð´ÑлÑ). ÐÐ»Ñ Ð½Ð°Ñ
Ð¾Ð¶Ð´ÐµÐ½Ð¸Ñ Ð±Ð¸Ð±Ð»Ð¸Ð¾Ñеки пÑименÑеÑÑÑ Ð¾Ð±ÑÑнÑй пÑÑÑ Ð¿Ð¾Ð¸Ñка библиоÑек. Ð ÑÑой библиоÑеке должна бÑÑÑ ÑÑнкÑÐ¸Ñ _PG_output_plugin_init, коÑоÑÐ°Ñ Ð¿Ð¾ÐºÐ°Ð·ÑваеÑ, ÑÑо библиоÑека на Ñамом деле пÑедÑÑавлÑÐµÑ Ñобой модÑÐ»Ñ Ð²Ñвода, и ÑÑÑÐ°Ð½Ð°Ð²Ð»Ð¸Ð²Ð°ÐµÑ ÑÑебÑемÑе обÑабоÑÑики модÑÐ»Ñ Ð²Ñвода. ÐÑой ÑÑнкÑии пеÑедаÑÑÑÑ ÑÑÑÑкÑÑÑа, в коÑоÑой Ð´Ð¾Ð»Ð¶Ð½Ñ Ð±ÑÑÑ Ð·Ð°Ð¿Ð¾Ð»Ð½ÐµÐ½Ñ ÑказаÑели на ÑÑнкÑии-обÑабоÑÑики оÑделÑнÑÑ
дейÑÑвий.
typedef struct OutputPluginCallbacks
{
LogicalDecodeStartupCB startup_cb;
LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb;
LogicalDecodeTruncateCB truncate_cb;
LogicalDecodeCommitCB commit_cb;
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
LogicalDecodeFilterPrepareCB filter_prepare_cb;
LogicalDecodeBeginPrepareCB begin_prepare_cb;
LogicalDecodePrepareCB prepare_cb;
LogicalDecodeCommitPreparedCB commit_prepared_cb;
LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
LogicalDecodeStreamStartCB stream_start_cb;
LogicalDecodeStreamStopCB stream_stop_cb;
LogicalDecodeStreamAbortCB stream_abort_cb;
LogicalDecodeStreamPrepareCB stream_prepare_cb;
LogicalDecodeStreamCommitCB stream_commit_cb;
LogicalDecodeStreamChangeCB stream_change_cb;
LogicalDecodeStreamMessageCB stream_message_cb;
LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;
typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); ÐбÑабоÑÑики begin_cb, change_cb и commit_cb Ð´Ð¾Ð»Ð¶Ð½Ñ ÑÑÑанавливаÑÑÑÑ Ð¾Ð±ÑзаÑелÑно, а startup_cb, truncate_cb, message_cb, filter_by_origin_cb и shutdown_cb могÑÑ Ð¾ÑÑÑÑÑÑвоваÑÑ. ÐÑли truncate_cb не ÑÑÑановлен, но поÑÑебÑеÑÑÑ Ð´ÐµÐºÐ¾Ð´Ð¸ÑоваÑÑ Ð¾Ð¿ÐµÑаÑÐ¸Ñ TRUNCATE, она бÑÐ´ÐµÑ Ð¿ÑоигноÑиÑована.
ÐодÑÐ»Ñ Ð²Ñвода Ð¼Ð¾Ð¶ÐµÑ Ñакже опÑеделÑÑÑ ÑÑнкÑии Ð´Ð»Ñ Ð¿Ð¾Ð´Ð´ÐµÑжки поÑоковой пеÑедаÑи болÑÑиÑ
ÑÑанзакÑий во вÑÐµÐ¼Ñ Ð¸Ñ
вÑполнениÑ. ФÑнкÑии stream_start_cb, stream_stop_cb, stream_abort_cb, stream_commit_cb и stream_change_cb ÑвлÑÑÑÑÑ Ð¾Ð±ÑзаÑелÑнÑми, а stream_message_cb и stream_truncate_cb â необÑзаÑелÑнÑми. ФÑнкÑÐ¸Ñ stream_prepare_cb Ñакже ÑвлÑеÑÑÑ Ð¾Ð±ÑзаÑелÑной, еÑли модÑÐ»Ñ Ð²Ñвода поддеÑÐ¶Ð¸Ð²Ð°ÐµÑ Ð´Ð²ÑÑ
ÑазнÑÑ ÑикÑаÑиÑ.
ÐодÑÐ»Ñ Ð²Ñвода Ñакже Ð¼Ð¾Ð¶ÐµÑ Ð¾Ð¿ÑеделÑÑÑ ÑÑнкÑии Ð´Ð»Ñ Ð¿Ð¾Ð´Ð´ÐµÑжки двÑÑ
Ñазной ÑикÑаÑии, позволÑÑÑие декодиÑоваÑÑ Ð´ÐµÐ¹ÑÑÐ²Ð¸Ñ Ð² PREPARE TRANSACTION. ÐбÑабоÑÑики begin_prepare_cb, prepare_cb, commit_prepared_cb и rollback_prepared_cb ÑвлÑÑÑÑÑ Ð¾Ð±ÑзаÑелÑнÑми, а filter_prepare_cb â неÑ. ÐбÑабоÑÑик stream_prepare_cb Ñакже ÑвлÑеÑÑÑ Ð¾Ð±ÑзаÑелÑнÑм, еÑли модÑÐ»Ñ Ð²Ñвода поддеÑÐ¶Ð¸Ð²Ð°ÐµÑ Ð¿Ð¾ÑоковÑÑ Ð¿ÐµÑедаÑÑ Ð±Ð¾Ð»ÑÑиÑ
вÑполнÑÑÑиÑ
ÑÑ ÑÑанзакÑий.
47.6.2. ÐозможноÑÑи #
ÐÐ»Ñ Ð´ÐµÐºÐ¾Ð´Ð¸ÑованиÑ, ÑоÑмаÑиÑÐ¾Ð²Ð°Ð½Ð¸Ñ Ð¸ вÑвода изменений модÑли вÑвода могÑÑ Ð¸ÑполÑзоваÑÑ Ð¿ÑакÑиÑеÑки вÑÑ Ð¾Ð±ÑÑнÑÑ Ð¸Ð½ÑÑаÑÑÑÑкÑÑÑÑ ÑеÑвеÑа, вклÑÑÐ°Ñ Ð²Ñзов ÑÑнкÑий вÑвода Ñипов. РоÑноÑениÑм ÑазÑеÑаеÑÑÑ Ð´Ð¾ÑÑÑп ÑолÑко на ÑÑение, еÑли ÑолÑко ÑÑи оÑноÑÐµÐ½Ð¸Ñ Ð±Ñли ÑÐ¾Ð·Ð´Ð°Ð½Ñ Ð¿ÑогÑаммой initdb в ÑÑ
еме pg_catalog, либо помеÑÐµÐ½Ñ ÐºÐ°Ðº полÑзоваÑелÑÑкие ÑаблиÑÑ ÐºÐ°Ñалогов командами
ALTER TABLE user_catalog_table SET (user_catalog_table = true); CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
ÐбÑаÑиÑе внимание, ÑÑо доÑÑÑп к полÑзоваÑелÑÑким ÑаблиÑам каÑалога или обÑÑнÑм ÑиÑÑемнÑм ÑаблиÑам каÑалога в подклÑÑаемÑÑ
модÑлÑÑ
вÑвода должен оÑÑÑеÑÑвлÑÑÑÑÑ ÑолÑко Ñ Ð¸ÑполÑзованием ÑÑнкÑий ÑканиÑÐ¾Ð²Ð°Ð½Ð¸Ñ systable_*. ÐÑи попÑÑке иÑполÑзоваÑÑ ÑÑнкÑии ÑканиÑÐ¾Ð²Ð°Ð½Ð¸Ñ heap_* пÑоизойдÑÑ Ð¾Ñибка. ÐÑоме Ñого, лÑбÑе дейÑÑвиÑ, коÑоÑÑе ÑÑебÑÑÑ Ð¿ÑиÑÐ²Ð°Ð¸Ð²Ð°Ð½Ð¸Ñ Ð¸Ð´ÐµÐ½ÑиÑикаÑоÑа ÑÑанзакÑии, запÑеÑаÑÑÑÑ. Ð ÑаÑÑноÑÑи, к ÑÑим дейÑÑвиÑм оÑноÑÑÑÑÑ Ð¾Ð¿ÐµÑаÑии запиÑи в ÑаблиÑÑ, Ð¸Ð·Ð¼ÐµÐ½ÐµÐ½Ð¸Ñ DDL и вÑзов pg_current_xact_id().
47.6.3. Ð ÐµÐ¶Ð¸Ð¼Ñ Ð²Ñвода #
ÐбÑабоÑÑики в модÑле вÑвода могÑÑ Ð¿ÐµÑедаваÑÑ Ð´Ð°Ð½Ð½Ñе поÑÑебиÑÐµÐ»Ñ Ð² пÑакÑиÑеÑки лÑбÑÑ
ÑоÑмаÑаÑ
. ÐÐ»Ñ Ð½ÐµÐºÐ¾ÑоÑÑÑ
ваÑианÑов иÑполÑзованиÑ, напÑимеÑ, пÑоÑмоÑÑа изменений ÑеÑез SQL, вÑвод инÑоÑмаÑии в ÑипаÑ
, коÑоÑÑе могÑÑ ÑодеÑжаÑÑ Ð¿ÑоизволÑнÑе даннÑе (напÑимеÑ, bytea), Ð¼Ð¾Ð¶ÐµÑ Ð±ÑÑÑ Ð½ÐµÑдобоваÑимÑм. ÐÑли модÑÐ»Ñ Ð²Ñвода вÑÐ²Ð¾Ð´Ð¸Ñ ÑолÑко ÑекÑÑовÑе даннÑе в кодиÑовке ÑеÑвеÑа, он Ð¼Ð¾Ð¶ÐµÑ Ð¾Ð±ÑÑвиÑÑ ÑÑо, ÑÑÑановив в OutputPluginOptions.output_type знаÑение OUTPUT_PLUGIN_TEXTUAL_OUTPUT вмеÑÑо OUTPUT_PLUGIN_BINARY_OUTPUT в обÑабоÑÑике запÑÑка. Ð ÑÑом ÑлÑÑае вÑе даннÑе Ð´Ð¾Ð»Ð¶Ð½Ñ Ð±ÑÑÑ Ð² кодиÑовке ÑеÑвеÑа, ÑÑÐ¾Ð±Ñ Ð¸Ñ
можно бÑло пеÑедаÑÑ Ð² знаÑении Ñипа text. ÐÑо конÑÑолиÑÑеÑÑÑ Ð² ÑбоÑкаÑ
Ñ Ð²ÐºÐ»ÑÑÑннÑми пÑовеÑоÑнÑми ÑÑвеÑждениÑми.
47.6.4. ÐбÑабоÑÑики в модÑле вÑвода #
ÐодÑÐ»Ñ Ð²Ñвода ÑведомлÑеÑÑÑ Ð¾ пÑоиÑÑ Ð¾Ð´ÑÑÐ¸Ñ Ð¸Ð·Ð¼ÐµÐ½ÐµÐ½Ð¸ÑÑ ÑеÑез ÑазлиÑнÑе обÑабоÑÑики, коÑоÑÑе он должен ÑÑÑановиÑÑ.
ÐаÑаллелÑнÑе ÑÑанзакÑии декодиÑÑÑÑÑÑ Ð² поÑÑдке ÑикÑаÑии, пÑи ÑÑом Ð¼ÐµÐ¶Ð´Ñ Ð¾Ð±ÑаÑнÑми вÑзовами begin и commit декодиÑÑÑÑÑÑ ÑолÑко изменениÑ, оÑноÑÑÑиеÑÑ Ðº опÑеделÑнной ÑÑанзакÑии. Явно или неÑвно оÑменÑннÑе ÑÑанзакÑии никогда не декодиÑÑÑÑÑÑ. УÑпеÑнÑе ÑоÑки ÑоÑ
ÑÐ°Ð½ÐµÐ½Ð¸Ñ Ð²ÐºÐ»ÑÑаÑÑÑÑ Ð² ÑодеÑжаÑÑÑ Ð¸Ñ
ÑÑанзакÑÐ¸Ñ Ð² Ñом поÑÑдке, в коÑоÑом они вÑполнÑлиÑÑ Ð² ÑÑой ÑÑанзакÑии. ТÑанзакÑиÑ, подгоÑÐ¾Ð²Ð»ÐµÐ½Ð½Ð°Ñ Ð´Ð»Ñ Ð´Ð²ÑÑ
Ñазной ÑикÑаÑии командой PREPARE TRANSACTION, Ñакже бÑÐ´ÐµÑ Ð´ÐµÐºÐ¾Ð´Ð¸Ñована, еÑли модÑÐ»Ñ Ð²Ñвода пÑедоÑÑавил ÑÑебÑемÑе обÑабоÑÑики Ð´Ð»Ñ Ð´ÐµÐºÐ¾Ð´Ð¸ÑованиÑ. Ðозможно, ÑÑо ÑекÑÑÐ°Ñ Ð¿Ð¾Ð´Ð³Ð¾ÑÐ¾Ð²Ð»ÐµÐ½Ð½Ð°Ñ Ð´ÐµÐºÐ¾Ð´Ð¸ÑÑÐµÐ¼Ð°Ñ ÑÑанзакÑÐ¸Ñ Ð±ÑÐ´ÐµÑ Ð¿ÑеÑвана паÑаллелÑно вÑполненной командой ROLLBACK PREPARED. Ð ÑÑом ÑлÑÑае бÑÐ´ÐµÑ Ð¿ÑеÑвано и логиÑеÑкое декодиÑование ÑÑой ÑÑанзакÑии. ÐÑе Ð¸Ð·Ð¼ÐµÐ½ÐµÐ½Ð¸Ñ Ñакой ÑÑанзакÑии пÑопÑÑкаÑÑÑÑ Ð¿Ð¾Ñле обнаÑÑÐ¶ÐµÐ½Ð¸Ñ Ð¿ÑеÑÑÐ²Ð°Ð½Ð¸Ñ Ð¸ вÑзова обÑабоÑÑика prepare_cb. Таким обÑазом даже пÑи блокиÑÑÑÑем пÑеÑÑвании модÑÐ»Ñ Ð²Ñвода пÑедоÑÑавлÑеÑÑÑ Ð´Ð¾ÑÑаÑоÑно инÑоÑмаÑии, ÑÑÐ¾Ð±Ñ Ð¾Ð½ мог пÑавилÑно обÑабоÑаÑÑ ROLLBACK PREPARED поÑле декодиÑованиÑ.
ÐÑимеÑание
ÐекодиÑоваÑÑÑÑ Ð±ÑдÑÑ ÑолÑко Ñе ÑÑанзакÑии, коÑоÑÑе Ñже ÑÑпеÑно ÑбÑоÑÐµÐ½Ñ Ð½Ð° диÑк. ÐÑледÑÑвие ÑÑого, COMMIT Ð¼Ð¾Ð¶ÐµÑ Ð½Ðµ декодиÑоваÑÑÑÑ Ð² ÑледÑÑÑем ÑÑÐ°Ð·Ñ Ð·Ð° ним вÑзове pg_logical_slot_get_changes(), когда synchronous_commit Ð¸Ð¼ÐµÐµÑ Ð·Ð½Ð°Ñение off.
47.6.4.1. ÐбÑабоÑÑик запÑÑка #
ÐеобÑзаÑелÑнÑй обÑабоÑÑик startup_cb вÑзÑваеÑÑÑ, когда ÑÐ»Ð¾Ñ ÑепликаÑии ÑоздаÑÑÑÑ Ð¸Ð»Ð¸ ÑеÑез него запÑаÑиваеÑÑÑ Ð¿ÐµÑедаÑа изменений, незавиÑимо Ð¾Ñ Ñого, в каком колиÑеÑÑве Ð¸Ð·Ð¼ÐµÐ½ÐµÐ½Ð¸Ñ Ð³Ð¾ÑÐ¾Ð²Ñ Ðº пеÑедаÑе.
typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
OutputPluginOptions *options,
bool is_init); ÐаÑамеÑÑ is_init бÑÐ´ÐµÑ Ñавен true, когда ÑÐ»Ð¾Ñ ÑепликаÑии ÑоздаÑÑÑÑ, и false в пÑоÑивном ÑлÑÑае. ÐаÑамеÑÑ options ÑказÑÐ²Ð°ÐµÑ Ð½Ð° ÑÑÑÑкÑÑÑÑ Ð¿Ð°ÑамеÑÑов, коÑоÑÑе могÑÑ ÑÑÑанавливаÑÑ Ð¼Ð¾Ð´Ñли вÑвода:
typedef struct OutputPluginOptions
{
OutputPluginOutputType output_type;
bool receive_rewrites;
} OutputPluginOptions; Рполе output_type должно бÑÑÑ Ð·Ð½Ð°Ñение OUTPUT_PLUGIN_TEXTUAL_OUTPUT или OUTPUT_PLUGIN_BINARY_OUTPUT. См. Ñакже ÐодÑаздел 47.6.3. ÐÑли поле receive_rewrites Ñавно true, модÑÐ»Ñ Ð²Ñвода Ñакже бÑÐ´ÐµÑ Ð²ÑзÑваÑÑÑÑ Ð´Ð»Ñ Ð¸Ð·Ð¼ÐµÐ½ÐµÐ½Ð¸Ð¹, ÑвÑзаннÑÑ
Ñ Ð¿ÐµÑезапиÑÑÑ ÐºÑÑи пÑи опÑеделÑннÑÑ
опеÑаÑиÑÑ
DDL. ÐÑи Ð¸Ð·Ð¼ÐµÐ½ÐµÐ½Ð¸Ñ Ð¿ÑедÑÑавлÑÑÑ Ð¸Ð½ÑеÑÐµÑ Ð´Ð»Ñ Ð¼Ð¾Ð´Ñлей, оÑÑÑеÑÑвлÑÑÑиÑ
ÑепликаÑÐ¸Ñ DDL, но Ð´Ð»Ñ Ð¸Ñ
обÑабоÑки Ð¼Ð¾Ð¶ÐµÑ Ð¿Ð¾ÑÑебоваÑÑÑÑ Ð¾ÑобÑй подÑ
од.
ÐбÑабоÑÑик запÑÑка должен пÑовеÑиÑÑ Ð¿Ð°ÑамеÑÑÑ, пÑедÑÑавленнÑе в ctx->output_plugin_options. ÐÑли модÑÐ»Ñ Ð²Ñвода ÑÑебÑеÑÑÑ Ð¿Ð¾Ð´Ð´ÐµÑживаÑÑ ÑоÑÑоÑние, он Ð¼Ð¾Ð¶ÐµÑ ÑоÑ
ÑаниÑÑ ÐµÐ³Ð¾ в ctx->output_plugin_private.
47.6.4.2. ÐбÑабоÑÑик вÑклÑÑÐµÐ½Ð¸Ñ #
ÐеобÑзаÑелÑнÑй обÑабоÑÑик shutdown_cb вÑзÑваеÑÑÑ, когда Ñанее акÑивнÑй ÑÐ»Ð¾Ñ ÑепликаÑии пеÑеÑÑаÑÑ Ð¸ÑполÑзоваÑÑÑÑ, Ñак ÑÑо ÑеÑÑÑÑÑ, занÑÑÑе модÑлем вÑвода, можно оÑвободиÑÑ. ÐÑи ÑÑом ÑÐ»Ð¾Ñ Ð½Ðµ обÑзаÑелÑно ÑдалÑеÑÑÑ, пÑекÑаÑаеÑÑÑ ÑолÑко поÑÐ¾ÐºÐ¾Ð²Ð°Ñ Ð¿ÐµÑедаÑа ÑеÑез него.
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
47.6.4.3. ÐбÑабоÑÑик наÑала ÑÑанзакÑии #
ÐбÑзаÑелÑнÑй обÑабоÑÑик begin_cb вÑзÑваеÑÑÑ, когда декодиÑÑеÑÑÑ Ð½Ð°Ñало заÑикÑиÑованной ÑÑанзакÑии. ÐÑеÑваннÑе ÑÑанзакÑии и иÑ
ÑодеÑжимое никогда не декодиÑÑеÑÑÑ.
typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn); ÐаÑамеÑÑ txn ÑодеÑÐ¶Ð¸Ñ Ð¼ÐµÑаинÑоÑмаÑÐ¸Ñ Ð¾ ÑÑанзакÑии, в ÑаÑÑноÑÑи ÐµÑ Ð¸Ð´ÐµÐ½ÑиÑикаÑÐ¾Ñ Ð¸ вÑÐµÐ¼Ñ ÐµÑ ÑикÑиÑованиÑ.
47.6.4.4. ÐбÑабоÑÑик завеÑÑÐµÐ½Ð¸Ñ ÑÑанзакÑии #
ÐбÑзаÑелÑнÑй обÑабоÑÑик commit_cb вÑзÑваеÑÑÑ, когда декодиÑÑеÑÑÑ ÑикÑиÑование ÑÑанзакÑии. ÐеÑед ÑÑим обÑабоÑÑиком бÑÐ´ÐµÑ Ð²ÑзÑваÑÑÑÑ Ð¾Ð±ÑабоÑÑик change_cb Ð´Ð»Ñ Ð²ÑеÑ
изменÑннÑÑ
ÑÑÑок (еÑли ÑÑÑоки бÑли измененÑ).
typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);47.6.4.5. ÐбÑабоÑÑик Ð¸Ð·Ð¼ÐµÐ½ÐµÐ½Ð¸Ñ #
ÐбÑзаÑелÑнÑй обÑабоÑÑик change_cb вÑзÑваеÑÑÑ Ð´Ð»Ñ ÐºÐ°Ð¶Ð´Ð¾Ð³Ð¾ оÑделÑного Ð¸Ð·Ð¼ÐµÐ½ÐµÐ½Ð¸Ñ ÑÑÑоки в ÑÑанзакÑии, пÑоизводимого командами INSERT, UPDATE или DELETE. Ðаже еÑли команда изменила неÑколÑко ÑÑÑок ÑÑазÑ, ÑÑÐ¾Ñ Ð¾Ð±ÑабоÑÑик бÑÐ´ÐµÑ Ð²ÑзÑваÑÑÑÑ Ð´Ð»Ñ ÐºÐ°Ð¶Ð´Ð¾Ð¹ оÑделÑной ÑÑÑоки. ÐбÑабоÑÑик change_cb Ð¼Ð¾Ð¶ÐµÑ Ð¾Ð±ÑаÑаÑÑÑÑ Ðº ÑиÑÑемнÑм или полÑзоваÑелÑÑким ÑаблиÑам каÑалога, ÑÑÐ¾Ð±Ñ Ð´Ð¾Ð¿Ð¾Ð»Ð½Ð¸ÑÑ Ð²ÑводимÑÑ Ð¸Ð½ÑоÑмаÑÐ¸Ñ Ð¾Ð± изменении ÑÑÑоки. Ð ÑлÑÑае декодиÑÐ¾Ð²Ð°Ð½Ð¸Ñ Ð¿Ð¾Ð´Ð³Ð¾Ñовленной (но еÑÑ Ð½Ðµ заÑикÑиÑованной) ÑÑанзакÑии или декодиÑÐ¾Ð²Ð°Ð½Ð¸Ñ Ð½ÐµÐ·Ð°ÑикÑиÑованной ÑÑанзакÑии ÑÑÐ¾Ñ Ð¾Ð±ÑабоÑÑик изменений Ñакже Ð¼Ð¾Ð¶ÐµÑ Ð²ÑдаÑÑ Ð¾ÑÐ¸Ð±ÐºÑ Ð¸Ð·-за одновÑеменного оÑкаÑа Ñой же Ñамой ÑÑанзакÑии. Ð ÑÑом ÑлÑÑае логиÑеÑкое декодиÑование ÑÑой пÑеÑванной ÑÑанзакÑии коÑÑекÑно оÑÑанавливаеÑÑÑ.
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change); ÐаÑамеÑÑÑ ctx и txn имеÑÑ Ñо же ÑодеÑжимое, ÑÑо и Ð´Ð»Ñ Ð¾Ð±ÑабоÑÑиков begin_cb и commit_cb; дополниÑелÑнÑй деÑкÑипÑÐ¾Ñ Ð¾ÑноÑÐµÐ½Ð¸Ñ relation ÑказÑÐ²Ð°ÐµÑ Ð½Ð° оÑноÑение, к коÑоÑÐ¾Ð¼Ñ Ð¿ÑÐ¸Ð½Ð°Ð´Ð»ÐµÐ¶Ð¸Ñ ÑÑÑока, а ÑÑÑÑкÑÑÑа change опиÑÑÐ²Ð°ÐµÑ Ð¿ÐµÑедаваемое изменение ÑÑÑоки.
ÐÑимеÑание
РпÑоÑеÑÑе логиÑеÑкого декодиÑÐ¾Ð²Ð°Ð½Ð¸Ñ Ð¼Ð¾Ð³ÑÑ Ð±ÑÑÑ Ð¾Ð±ÑабоÑÐ°Ð½Ñ Ð¸Ð·Ð¼ÐµÐ½ÐµÐ½Ð¸Ñ ÑолÑко в ÑаблиÑаÑ
, не ÑвлÑÑÑиÑ
ÑÑ Ð½ÐµÐ¶ÑÑналиÑÑемÑми (Ñм. опиÑание UNLOGGED) или вÑеменнÑми (Ñм. опиÑание TEMPORARY или TEMP).
47.6.4.6. ÐбÑабоÑÑик опÑÑÑоÑÐµÐ½Ð¸Ñ #
ÐеобÑзаÑелÑнÑй обÑабоÑÑик truncate_cb вÑзÑваеÑÑÑ Ð´Ð»Ñ ÐºÐ¾Ð¼Ð°Ð½Ð´Ñ TRUNCATE.
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change); Ðн полÑÑÐ°ÐµÑ Ñе же паÑамеÑÑÑ, ÑÑо и change_cb. Ðо Ñак как опеÑаÑии TRUNCATE в ÑаблиÑаÑ
, ÑвÑзаннÑÑ
внеÑними клÑÑами, Ð´Ð¾Ð»Ð¶Ð½Ñ Ð²ÑполнÑÑÑÑÑ Ð¾Ð´Ð½Ð¾Ð²Ñеменно, даннÑй обÑабоÑÑик полÑÑÐ°ÐµÑ Ð½Ð° вÑ
од не одно оÑноÑение, а маÑÑив оÑноÑений. Ðа подÑобноÑÑÑми обÑаÑиÑеÑÑ Ðº опиÑÐ°Ð½Ð¸Ñ Ð¾Ð¿ÐµÑаÑоÑа TRUNCATE.
47.6.4.7. ÐбÑабоÑÑик ÑилÑÑÑаÑии иÑÑоÑника #
ÐеобÑзаÑелÑнÑй обÑабоÑÑик filter_by_origin_cb вÑзÑваеÑÑÑ, ÑÑÐ¾Ð±Ñ Ð¾ÑмеÑиÑÑ, инÑеÑеÑÑÑÑ Ð»Ð¸ модÑÐ»Ñ Ð²Ñвода изменениÑ, воÑпÑоизводимÑе из Ñказанного иÑÑоÑника (origin_id).
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
RepOriginId origin_id); РпаÑамеÑÑе ctx пеÑедаÑÑÑÑ Ñа же инÑоÑмаÑиÑ, ÑÑо и Ð´Ð»Ñ Ð´ÑÑгиÑ
обÑабоÑÑиков. ЧÑÐ¾Ð±Ñ Ð¾ÑмеÑиÑÑ, ÑÑо изменениÑ, поÑÑÑпаÑÑие из пеÑеданного Ñзла, не пÑедÑÑавлÑÑÑ Ð¸Ð½ÑеÑеÑа, модÑÐ»Ñ Ð´Ð¾Ð»Ð¶ÐµÐ½ веÑнÑÑÑ true, вÑледÑÑвие Ñего ÑÑи Ð¸Ð·Ð¼ÐµÐ½ÐµÐ½Ð¸Ñ Ð±ÑдÑÑ ÑилÑÑÑоваÑÑÑÑ; в пÑоÑивном ÑлÑÑае он должен веÑнÑÑÑ false. ÐÑÑгие обÑабоÑÑики Ð´Ð»Ñ ÑилÑÑÑÑемÑÑ
ÑÑанзакÑий и изменений вÑзÑваÑÑÑÑ Ð½Ðµ бÑдÑÑ.
ÐÑо полезно пÑи ÑеализаÑии каÑкадной или ÑазнонапÑавленной ÑепликаÑии. ФилÑÑÑаÑÐ¸Ñ Ð¿Ð¾ иÑÑоÑÐ½Ð¸ÐºÑ Ð² ÑÐ°ÐºÐ¸Ñ ÐºÐ¾Ð½ÑигÑÑаÑиÑÑ Ð¿Ð¾Ð·Ð²Ð¾Ð»ÑÐµÑ Ð¿ÑедоÑвÑаÑиÑÑ Ð¿ÐµÑедаÑÑ Ð²Ð·Ð°Ð´-впеÑÑд Ð¾Ð´Ð½Ð¸Ñ Ð¸ ÑÐµÑ Ð¶Ðµ изменений. ХоÑÑ Ð¸Ð½ÑоÑмаÑÐ¸Ñ Ð¾Ð± иÑÑоÑнике можно Ñакже извлеÑÑ Ð¸Ð· ÑÑанзакÑий и изменений, ÑилÑÑÑаÑÐ¸Ñ Ñ Ð¿Ð¾Ð¼Ð¾ÑÑÑ ÑÑого обÑабоÑÑика гоÑаздо более ÑÑÑекÑивна.
47.6.4.8. ÐбÑабоÑÑик пÑоизволÑнÑÑ ÑообÑений #
ÐеобÑзаÑелÑнÑй обÑабоÑÑик message_cb вÑзÑваеÑÑÑ Ð¿Ñи полÑÑении ÑообÑÐµÐ½Ð¸Ñ Ð»Ð¾Ð³Ð¸ÑеÑкого декодиÑованиÑ.
typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
bool transactional,
const char *prefix,
Size message_size,
const char *message); ÐаÑамеÑÑ txn ÑодеÑÐ¶Ð¸Ñ Ð¼ÐµÑаинÑоÑмаÑÐ¸Ñ Ð¾ ÑÑанзакÑии, вклÑÑÐ°Ñ Ð²ÑÐµÐ¼Ñ ÐµÑ ÑикÑаÑии и ÐµÑ XID. ÐамеÑÑÑе, однако, ÑÑо в нÑм Ð¼Ð¾Ð¶ÐµÑ Ð¿ÐµÑедаваÑÑÑÑ NULL, когда ÑообÑение неÑÑанзакÑионное и ÑÑанзакÑии, в коÑоÑой бÑло вÑдано ÑообÑение, еÑÑ Ð½Ðµ назнаÑен XID. РпаÑамеÑÑе lsn оÑмеÑаеÑÑÑ Ð¿Ð¾Ð·Ð¸ÑÐ¸Ñ ÑообÑÐµÐ½Ð¸Ñ Ð² WAL. ÐаÑамеÑÑ transactional показÑваеÑ, бÑло ли ÑообÑение пеÑедано как ÑÑанзакÑионное. Ðодобно обÑабоÑÑÐ¸ÐºÑ Ð¸Ð·Ð¼ÐµÐ½ÐµÐ½Ð¸Ð¹, в ÑлÑÑае декодиÑÐ¾Ð²Ð°Ð½Ð¸Ñ Ð¿Ð¾Ð´Ð³Ð¾Ñовленной (но еÑÑ Ð½Ðµ заÑикÑиÑованной) ÑÑанзакÑии или декодиÑÐ¾Ð²Ð°Ð½Ð¸Ñ Ð½ÐµÐ·Ð°ÑикÑиÑованной ÑÑанзакÑии ÑÑÐ¾Ñ Ð¾Ð±ÑабоÑÑик Ñакже Ð¼Ð¾Ð¶ÐµÑ Ð²ÑдаÑÑ Ð¾ÑÐ¸Ð±ÐºÑ Ð¸Ð·-за одновÑеменного оÑкаÑа Ñой же Ñамой ÑÑанзакÑии. Ð ÑÑом ÑлÑÑае логиÑеÑкое декодиÑование ÑÑой пÑеÑванной ÑÑанзакÑии коÑÑекÑно оÑÑанавливаеÑÑÑ. РпаÑамеÑÑе prefix пеÑедаÑÑÑÑ Ð½ÐµÐºÐ¾ÑоÑÑй пÑеÑÐ¸ÐºÑ (завеÑÑаÑÑийÑÑ Ð½ÑлÑм), по коÑоÑÐ¾Ð¼Ñ ÑекÑÑий модÑÐ»Ñ Ð¼Ð¾Ð¶ÐµÑ Ð²ÑделÑÑÑ Ð¸Ð½ÑеÑеÑÑÑÑие его ÑообÑениÑ. РнаконеÑ, паÑамеÑÑ message ÑодеÑÐ¶Ð¸Ñ Ñамо ÑообÑение ÑазмеÑом message_size байÑ.
ÐÐµÐ¾Ð±Ñ Ð¾Ð´Ð¸Ð¼Ð¾ дополниÑелÑно позабоÑиÑÑÑÑ Ð¾ Ñом, ÑÑÐ¾Ð±Ñ Ð¿ÑеÑикÑ, опÑеделÑÑÑий инÑеÑеÑÑÑÑие модÑÐ»Ñ Ð²Ñвода ÑообÑениÑ, бÑл ÑникалÑнÑм. УдаÑнÑм вÑбоÑом обÑÑно бÑÐ´ÐµÑ Ð¸Ð¼Ñ ÑаÑÑиÑÐµÐ½Ð¸Ñ Ð¸Ð»Ð¸ Ñамого модÑÐ»Ñ Ð²Ñвода.
47.6.4.9. ÐбÑабоÑÑик ÑилÑÑÑа подгоÑовки #
ÐеобÑзаÑелÑнÑй обÑабоÑÑик filter_prepare_cb вÑзÑваеÑÑÑ, ÑÑÐ¾Ð±Ñ Ð¾Ð¿ÑеделиÑÑ, ÑледÑÐµÑ Ð»Ð¸ декодиÑоваÑÑ Ð´Ð°Ð½Ð½Ñе, ÑоÑÑавлÑÑÑие ÑекÑÑÑÑ ÑикÑиÑÑемÑÑ Ð´Ð²ÑÑ
ÑазнÑÑ ÑÑанзакÑиÑ, на ÑÑапе подгоÑовки или позже â в виде обÑÑной одноÑазной ÑÑанзакÑии на ÑÑапе COMMIT PREPARED. ÐÑли декодиÑование ÑледÑÐµÑ Ð¿ÑопÑÑÑиÑÑ, обÑабоÑÑик должен вÑдаÑÑ true, а инаÑе â false. ÐÑли обÑабоÑÑик не опÑеделÑн, пÑедполагаеÑÑÑ false (Ñ. е. ÑилÑÑÑаÑÐ¸Ñ Ð¾ÑÑÑÑÑÑвÑÐµÑ â вÑе ÑÑанзакÑии, иÑполÑзÑÑÑие двÑÑ
ÑазнÑÑ ÑикÑаÑиÑ, декодиÑÑÑÑÑÑ Ñакже в две ÑазÑ).
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
TransactionId xid,
const char *gid); ÐаÑамеÑÑ ctx Ð¸Ð¼ÐµÐµÑ Ñо же ÑодеÑжимое, ÑÑо и в дÑÑгиÑ
обÑабоÑÑикаÑ
. ÐаÑамеÑÑÑ xid и gid пÑедоÑÑавлÑÑÑ Ð´Ð²Ð° ÑазнÑÑ
ÑпоÑоба иденÑиÑикаÑии ÑÑанзакÑии. ÐоÑледÑÑÑÐ°Ñ ÐºÐ¾Ð¼Ð°Ð½Ð´Ð° COMMIT PREPARED или ROLLBACK PREPARED пеÑедаÑÑ Ð¾Ð±Ð° иденÑиÑикаÑоÑа, пÑедоÑÑавлÑÑ Ð¼Ð¾Ð´ÑÐ»Ñ Ð²Ñвода возможноÑÑÑ Ð²ÑбоÑа, какой иÑполÑзоваÑÑ.
ÐбÑабоÑÑик Ð¼Ð¾Ð¶ÐµÑ Ð²ÑзÑваÑÑÑÑ Ð½ÐµÑколÑко Ñаз в ÑеÑение одной декодиÑÑемой ÑÑанзакÑии и должен вÑдаваÑÑ Ð¾Ð´Ð¸Ð½ и ÑÐ¾Ñ Ð¶Ðµ поÑÑоÑннÑй оÑÐ²ÐµÑ Ð´Ð»Ñ Ð´Ð°Ð½Ð½Ð¾Ð¹ паÑÑ xid и gid пÑи каждом вÑзове.
47.6.4.10. ÐбÑабоÑÑик наÑала подгоÑовленной ÑÑанзакÑии #
ÐбÑзаÑелÑнÑй обÑабоÑÑик begin_prepare_cb вÑзÑваеÑÑÑ Ð¿Ñи декодиÑовании наÑала подгоÑовленной ÑÑанзакÑии. ÐеÑеданное в паÑамеÑÑе txn поле gid позволÑÐµÑ Ð¾Ð±ÑабоÑÑÐ¸ÐºÑ Ð¾Ð¿ÑеделиÑÑ, полÑÑал ли ÑÑÐ¾Ñ Ð¼Ð¾Ð´ÑÐ»Ñ Ñже даннÑÑ ÐºÐ¾Ð¼Ð°Ð½Ð´Ñ PREPARE, â еÑли да, он Ð¼Ð¾Ð¶ÐµÑ Ð»Ð¸Ð±Ð¾ вÑдаÑÑ Ð¾ÑибкÑ, либо пÑопÑÑÑиÑÑ Ð¾ÑÑавÑиеÑÑ Ð¸Ð·Ð¼ÐµÐ½ÐµÐ½Ð¸Ñ ÑÑанзакÑии.
typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);47.6.4.11. ÐбÑабоÑÑик подгоÑовки ÑÑанзакÑии #
ÐбÑзаÑелÑнÑй обÑабоÑÑик prepare_cb вÑзÑваеÑÑÑ Ð¿Ñи декодиÑовании ÑÑанзакÑии, подгоÑовленной Ð´Ð»Ñ Ð´Ð²ÑÑ
Ñазной ÑикÑаÑии. ÐеÑед ÑÑим обÑабоÑÑиком бÑÐ´ÐµÑ Ð²ÑзÑваÑÑÑÑ Ð¾Ð±ÑабоÑÑик change_cb Ð´Ð»Ñ Ð²ÑеÑ
изменÑннÑÑ
ÑÑÑок (еÑли ÑÑÑоки бÑли измененÑ). Ð ÑÑом обÑабоÑÑике Ð¼Ð¾Ð¶ÐµÑ Ð¸ÑполÑзоваÑÑÑÑ Ð¿Ð¾Ð»Ðµ gid, ÑвлÑÑÑееÑÑ ÑаÑÑÑÑ Ð¿Ð°ÑамеÑÑа txn.
typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn);47.6.4.12. ÐбÑабоÑÑик ÑикÑаÑии подгоÑовленной ÑÑанзакÑии #
ÐбÑзаÑелÑнÑй обÑабоÑÑик commit_prepared_cb вÑзÑваеÑÑÑ Ð¿Ñи декодиÑовании ÐºÐ¾Ð¼Ð°Ð½Ð´Ñ COMMIT PREPARED ÑÑанзакÑии. Ð ÑÑом обÑабоÑÑике Ð¼Ð¾Ð¶ÐµÑ Ð¸ÑполÑзоваÑÑÑÑ Ð¿Ð¾Ð»Ðµ gid, ÑвлÑÑÑееÑÑ ÑаÑÑÑÑ Ð¿Ð°ÑамеÑÑа txn.
typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);47.6.4.13. ÐбÑабоÑÑик оÑÐ¼ÐµÐ½Ñ Ð¿Ð¾Ð´Ð³Ð¾Ñовленной ÑÑанзакÑии #
ÐбÑзаÑелÑнÑй обÑабоÑÑик rollback_prepared_cb вÑзÑваеÑÑÑ Ð¿Ñи декодиÑовании ÐºÐ¾Ð¼Ð°Ð½Ð´Ñ ROLLBACK PREPARED ÑÑанзакÑии. Ð ÑÑом обÑабоÑÑике Ð¼Ð¾Ð¶ÐµÑ Ð¸ÑполÑзоваÑÑÑÑ Ð¿Ð¾Ð»Ðµ gid, коÑоÑое ÑвлÑеÑÑÑ ÑаÑÑÑÑ Ð¿Ð°ÑамеÑÑа txn. ÐÑоанализиÑовав паÑамеÑÑÑ prepare_end_lsn и prepare_time, модÑÐ»Ñ Ð´ÐµÐºÐ¾Ð´Ð¸ÑÐ¾Ð²Ð°Ð½Ð¸Ñ Ð¼Ð¾Ð¶ÐµÑ Ð¾Ð¿ÑеделиÑÑ, полÑÑал ли он PREPARE TRANSACTION, и еÑли да â вÑполниÑÑ Ð¾ÑÐ¼ÐµÐ½Ñ ÑÑанзакÑии, а в пÑоÑивном ÑлÑÑае пÑопÑÑÑиÑÑ ÑÑÑ Ð¾Ð¿ÐµÑаÑиÑ. Ðдного Ð¿Ð¾Ð»Ñ gid Ð´Ð»Ñ ÑÑого недоÑÑаÑоÑно, Ñак как на нижеÑÑоÑÑем Ñзле Ð¼Ð¾Ð¶ÐµÑ Ð±ÑÑÑ Ð¿Ð¾Ð´Ð³Ð¾ÑÐ¾Ð²Ð»ÐµÐ½Ð½Ð°Ñ ÑÑанзакÑÐ¸Ñ Ñ Ñаким же ÑÑÑоковÑм иденÑиÑикаÑоÑом.
typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_end_lsn,
TimestampTz prepare_time);47.6.4.14. ÐбÑабоÑÑик наÑала поÑока #
ÐбÑзаÑелÑнÑй обÑабоÑÑик stream_start_cb вÑзÑваеÑÑÑ, когда оÑкÑÑваеÑÑÑ Ð±Ð»Ð¾Ðº пеÑедаваемÑÑ
в поÑоке изменений вÑполнÑÑÑейÑÑ ÑÑанзакÑии.
typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);47.6.4.15. ÐбÑабоÑÑик оÑÑановки поÑока #
ÐбÑзаÑелÑнÑй обÑабоÑÑик stream_stop_cb вÑзÑваеÑÑÑ, когда закÑÑваеÑÑÑ Ð±Ð»Ð¾Ðº пеÑедаваемÑÑ
в поÑоке изменений вÑполнÑÑÑейÑÑ ÑÑанзакÑии.
typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);47.6.4.16. ÐбÑабоÑÑик пÑеÑÑÐ²Ð°Ð½Ð¸Ñ Ð¿Ð¾Ñока #
ÐбÑзаÑелÑнÑй обÑабоÑÑик stream_abort_cb вÑзÑваеÑÑÑ Ð´Ð»Ñ Ð¿ÑеÑÑÐ²Ð°Ð½Ð¸Ñ Ð¿ÐµÑедаваемой в поÑоке ÑÑанзакÑии.
typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);47.6.4.17. ÐбÑабоÑÑик подгоÑовки поÑока #
ÐбÑабоÑÑик stream_prepare_cb вÑзÑваеÑÑÑ Ð´Ð»Ñ Ð¿Ð¾Ð´Ð³Ð¾Ñовки Ñанее пеÑеданной в поÑоке ÑÑанзакÑии как ÑаÑÑи двÑÑ
Ñазной ÑикÑаÑии. ÐÑÐ¾Ñ Ð¾Ð±ÑабоÑÑик ÑÑебÑеÑÑÑ, когда плагин вÑвода поддеÑÐ¶Ð¸Ð²Ð°ÐµÑ ÐºÐ°Ðº поÑоковÑÑ Ð¿ÐµÑедаÑÑ Ð±Ð¾Ð»ÑÑиÑ
ÑекÑÑиÑ
ÑÑанзакÑий, Ñак и двÑÑ
ÑазнÑе ÑикÑаÑии.
typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn);47.6.4.18. ÐбÑабоÑÑик ÑикÑаÑии поÑока #
ÐбÑзаÑелÑнÑй обÑабоÑÑик stream_commit_cb вÑзÑваеÑÑÑ Ð´Ð»Ñ ÑикÑаÑии Ñанее пеÑеданной в поÑоке ÑÑанзакÑии.
typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);47.6.4.19. ÐбÑабоÑÑик изменений в поÑоке #
ÐбÑзаÑелÑнÑй обÑабоÑÑик stream_change_cb вÑзÑваеÑÑÑ, когда пеÑедаÑÑÑÑ Ð¸Ð·Ð¼ÐµÐ½ÐµÐ½Ð¸Ðµ в блоке изменений (гÑаниÑÑ ÐºÐ¾ÑоÑого обознаÑаÑÑ Ð²ÑÐ·Ð¾Ð²Ñ stream_start_cb и stream_stop_cb). ФакÑиÑеÑкие Ð¸Ð·Ð¼ÐµÐ½ÐµÐ½Ð¸Ñ Ð½Ðµ оÑобÑажаÑÑÑÑ, поÑколÑÐºÑ ÑÑанзакÑÐ¸Ñ Ð¼Ð¾Ð¶ÐµÑ Ð±ÑÑÑ Ð¿ÑеÑвана позже, а Ð¸Ð·Ð¼ÐµÐ½ÐµÐ½Ð¸Ñ Ð´Ð»Ñ Ð¿ÑеÑваннÑÑ
ÑÑанзакÑий не декодиÑÑÑÑÑÑ.
typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);47.6.4.20. ÐбÑабоÑÑик ÑообÑÐµÐ½Ð¸Ñ Ð¿Ð¾Ñока #
ÐеобÑзаÑелÑнÑй обÑабоÑÑик stream_message_cb вÑзÑваеÑÑÑ, когда пеÑедаÑÑÑÑ Ð¿ÑоизволÑное ÑообÑение в блоке изменений (гÑаниÑÑ ÐºÐ¾ÑоÑого обознаÑаÑÑ Ð²ÑÐ·Ð¾Ð²Ñ stream_start_cb и stream_stop_cb). СодеÑжимое ÑÑанзакÑионнÑÑ
ÑообÑений не оÑобÑажаеÑÑÑ, поÑколÑÐºÑ ÑÑанзакÑÐ¸Ñ Ð¼Ð¾Ð¶ÐµÑ Ð±ÑÑÑ Ð¿ÑеÑвана позже, а Ð¸Ð·Ð¼ÐµÐ½ÐµÐ½Ð¸Ñ Ð´Ð»Ñ Ð¿ÑеÑваннÑÑ
ÑÑанзакÑий не декодиÑÑÑÑÑÑ.
typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
bool transactional,
const char *prefix,
Size message_size,
const char *message);47.6.4.21. ÐбÑабоÑÑик ÐºÐ¾Ð¼Ð°Ð½Ð´Ñ Ð¾Ð¿ÑÑÑоÑÐµÐ½Ð¸Ñ Ð² поÑоке #
ÐеобÑзаÑелÑнÑй обÑабоÑÑик truncate_cb вÑзÑваеÑÑÑ Ð´Ð»Ñ ÐºÐ¾Ð¼Ð°Ð½Ð´Ñ TRUNCATE в блоке изменений (гÑаниÑÑ ÐºÐ¾ÑоÑого обознаÑаÑÑ Ð²ÑÐ·Ð¾Ð²Ñ stream_start_cb и stream_stop_cb).
typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change); ÐаÑамеÑÑÑ ÑÑого обÑабоÑÑика аналогиÑÐ½Ñ Ð¿Ð°ÑамеÑÑам change_cb. Ðо Ñак как опеÑаÑии TRUNCATE в ÑаблиÑаÑ
, ÑвÑзаннÑÑ
внеÑними клÑÑами, Ð´Ð¾Ð»Ð¶Ð½Ñ Ð²ÑполнÑÑÑÑÑ Ð¾Ð´Ð½Ð¾Ð²Ñеменно, даннÑй обÑабоÑÑик полÑÑÐ°ÐµÑ Ð½Ð° вÑ
од не одно оÑноÑение, а маÑÑив оÑноÑений. Ðа подÑобноÑÑÑми обÑаÑиÑеÑÑ Ðº опиÑÐ°Ð½Ð¸Ñ Ð¾Ð¿ÐµÑаÑоÑа TRUNCATE.
47.6.5. ФÑнкÑии Ð´Ð»Ñ ÑоÑмиÑÐ¾Ð²Ð°Ð½Ð¸Ñ Ð²Ñвода #
ЧÑÐ¾Ð±Ñ Ð´ÐµÐ¹ÑÑвиÑелÑно вÑвеÑÑи даннÑе, модÑли вÑвода могÑÑ Ð·Ð°Ð¿Ð¸ÑÑваÑÑ Ð¸Ñ
в бÑÑÐµÑ StringInfo ÑеÑез ctx->out, внÑÑÑи обÑабоÑÑиков begin_cb, commit_cb или change_cb. ÐÑежде Ñем запиÑÑваÑÑ Ð´Ð°Ð½Ð½Ñе в ÑÑÐ¾Ñ Ð±ÑÑеÑ, необÑ
одимо вÑзваÑÑ OutputPluginPrepareWrite(ctx, last_write), а завеÑÑив запиÑÑ Ð² бÑÑеÑ, нÑжно вÑзваÑÑ OutputPluginWrite(ctx, last_write), ÑÑÐ¾Ð±Ñ ÑобÑÑвенно пÑоизвеÑÑи запиÑÑ. ÐаÑамеÑÑ last_write ÑказÑваеÑ, бÑла ли ÑÑа опÑеделÑÐ½Ð½Ð°Ñ Ð¾Ð¿ÐµÑаÑÐ¸Ñ Ð·Ð°Ð¿Ð¸Ñи поÑледней в данном обÑабоÑÑике.
СледÑÑÑий пÑÐ¸Ð¼ÐµÑ Ð¿Ð¾ÐºÐ°Ð·ÑваеÑ, как вÑвеÑÑи даннÑе Ð´Ð»Ñ Ð¿Ð¾ÑÑебиÑÐµÐ»Ñ Ð¼Ð¾Ð´ÑÐ»Ñ Ð²Ñвода:
OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "BEGIN %u", txn->xid); OutputPluginWrite(ctx, true);