46.6. ÐодÑли вÑвода логиÑеÑкого декодиÑованиÑ
ÐÑÐ¸Ð¼ÐµÑ Ð¼Ð¾Ð´ÑÐ»Ñ Ð²Ñвода можно найÑи в подкаÑалоге contrib/test_decoding в деÑеве иÑÑ
одного кода Postgres Pro.
46.6.1. ФÑнкÑÐ¸Ñ Ð¸Ð½Ð¸ÑиализаÑии
ÐодÑÐ»Ñ Ð²Ñвода загÑÑжаеÑÑÑ Ð² ÑезÑлÑÑаÑе динамиÑеÑкой загÑÑзки ÑазделÑемой библиоÑеки (пÑи ÑÑом в каÑеÑÑве имени библиоÑеки задаÑÑÑÑ Ð¸Ð¼Ñ Ð¼Ð¾Ð´ÑлÑ). ÐÐ»Ñ Ð½Ð°Ñ
Ð¾Ð¶Ð´ÐµÐ½Ð¸Ñ Ð±Ð¸Ð±Ð»Ð¸Ð¾Ñеки пÑименÑеÑÑÑ Ð¾Ð±ÑÑнÑй пÑÑÑ Ð¿Ð¾Ð¸Ñка библиоÑек. Ð ÑÑой библиоÑеке должна бÑÑÑ ÑÑнкÑÐ¸Ñ _PG_output_plugin_init, коÑоÑÐ°Ñ Ð¿Ð¾ÐºÐ°Ð·ÑваеÑ, ÑÑо библиоÑека на Ñамом деле пÑедÑÑавлÑÐµÑ Ñобой модÑÐ»Ñ Ð²Ñвода, и ÑÑÑÐ°Ð½Ð°Ð²Ð»Ð¸Ð²Ð°ÐµÑ ÑÑебÑемÑе обÑабоÑÑики модÑÐ»Ñ Ð²Ñвода. ÐÑой ÑÑнкÑии пеÑедаÑÑÑÑ ÑÑÑÑкÑÑÑа, в коÑоÑой Ð´Ð¾Ð»Ð¶Ð½Ñ Ð±ÑÑÑ Ð·Ð°Ð¿Ð¾Ð»Ð½ÐµÐ½Ñ ÑказаÑели на ÑÑнкÑии-обÑабоÑÑики оÑделÑнÑÑ
дейÑÑвий.
typedef struct OutputPluginCallbacks
{
LogicalDecodeStartupCB startup_cb;
LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb;
LogicalDecodeCommitCB commit_cb;
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
} OutputPluginCallbacks;
typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); ÐбÑабоÑÑики begin_cb, change_cb и commit_cb Ð´Ð¾Ð»Ð¶Ð½Ñ ÑÑÑанавливаÑÑÑÑ Ð¾Ð±ÑзаÑелÑно, а startup_cb, filter_by_origin_cb и shutdown_cb могÑÑ Ð¾ÑÑÑÑÑÑвоваÑÑ.
46.6.2. ÐозможноÑÑи
ÐÐ»Ñ Ð´ÐµÐºÐ¾Ð´Ð¸ÑованиÑ, ÑоÑмаÑиÑÐ¾Ð²Ð°Ð½Ð¸Ñ Ð¸ вÑвода изменений модÑли вÑвода могÑÑ Ð¸ÑполÑзоваÑÑ Ð¿ÑакÑиÑеÑки вÑÑ Ð¾Ð±ÑÑнÑÑ Ð¸Ð½ÑÑаÑÑÑÑкÑÑÑÑ ÑеÑвеÑа, вклÑÑÐ°Ñ Ð²Ñзов ÑÑнкÑий вÑвода Ñипов. РоÑноÑениÑм ÑазÑеÑаеÑÑÑ Ð´Ð¾ÑÑÑп ÑолÑко на ÑÑение, еÑли ÑолÑко ÑÑи оÑноÑÐµÐ½Ð¸Ñ Ð±Ñли ÑÐ¾Ð·Ð´Ð°Ð½Ñ Ð¿ÑогÑаммой initdb в ÑÑ
еме pg_catalog, либо помеÑÐµÐ½Ñ ÐºÐ°Ðº полÑзоваÑелÑÑкие ÑаблиÑÑ ÐºÐ°Ñалогов командами
ALTER TABLE user_catalog_table SET (user_catalog_table = true); CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
ÐÑбÑе дейÑÑвиÑ, коÑоÑÑе ÑÑебÑÑÑ Ð¿ÑиÑÐ²Ð°Ð¸Ð²Ð°Ð½Ð¸Ñ Ð¸Ð´ÐµÐ½ÑиÑикаÑоÑа ÑÑанзакÑии, запÑеÑаÑÑÑÑ. Ð ÑаÑÑноÑÑи, к ÑÑим дейÑÑвиÑм оÑноÑÑÑÑÑ Ð¾Ð¿ÐµÑаÑии запиÑи в ÑаблиÑÑ, Ð¸Ð·Ð¼ÐµÐ½ÐµÐ½Ð¸Ñ DDL и вÑзов txid_current().
46.6.3. Ð ÐµÐ¶Ð¸Ð¼Ñ Ð²Ñвода
ÐбÑабоÑÑики в модÑле вÑвода могÑÑ Ð¿ÐµÑедаваÑÑ Ð´Ð°Ð½Ð½Ñе поÑÑебиÑÐµÐ»Ñ Ð² пÑакÑиÑеÑки лÑбÑÑ
ÑоÑмаÑаÑ
. ÐÐ»Ñ Ð½ÐµÐºÐ¾ÑоÑÑÑ
ваÑианÑов иÑполÑзованиÑ, напÑимеÑ, пÑоÑмоÑÑа изменений ÑеÑез SQL, вÑвод инÑоÑмаÑии в ÑипаÑ
, коÑоÑÑе могÑÑ ÑодеÑжаÑÑ Ð¿ÑоизволÑнÑе даннÑе (напÑимеÑ, bytea), Ð¼Ð¾Ð¶ÐµÑ Ð±ÑÑÑ Ð½ÐµÑдобоваÑимÑм. ÐÑли модÑÐ»Ñ Ð²Ñвода вÑÐ²Ð¾Ð´Ð¸Ñ ÑолÑко ÑекÑÑовÑе даннÑе в кодиÑовке ÑеÑвеÑа, он Ð¼Ð¾Ð¶ÐµÑ Ð¾Ð±ÑÑвиÑÑ ÑÑо, ÑÑÑановив в OutputPluginOptions.output_type знаÑение OUTPUT_PLUGIN_TEXTUAL_OUTPUT вмеÑÑо OUTPUT_PLUGIN_BINARY_OUTPUT в обÑабоÑÑике запÑÑка. Ð ÑÑом ÑлÑÑае вÑе даннÑе Ð´Ð¾Ð»Ð¶Ð½Ñ Ð±ÑÑÑ Ð² кодиÑовке ÑеÑвеÑа, ÑÑÐ¾Ð±Ñ Ð¸Ñ
можно бÑло пеÑедаÑÑ Ð² знаÑении Ñипа text. ÐÑо конÑÑолиÑÑеÑÑÑ Ð² ÑбоÑкаÑ
Ñ Ð²ÐºÐ»ÑÑÑннÑми пÑовеÑоÑнÑми ÑÑвеÑждениÑми.
46.6.4. ÐбÑабоÑÑики в модÑле вÑвода
ÐодÑÐ»Ñ Ð²Ñвода ÑведомлÑеÑÑÑ Ð¾ пÑоиÑÑ Ð¾Ð´ÑÑÐ¸Ñ Ð¸Ð·Ð¼ÐµÐ½ÐµÐ½Ð¸ÑÑ ÑеÑез ÑазлиÑнÑе обÑабоÑÑики, коÑоÑÑе он должен ÑÑÑановиÑÑ.
ÐаÑаллелÑнÑе ÑÑанзакÑии декодиÑÑÑÑÑÑ Ð² поÑÑдке ÑикÑиÑованиÑ, пÑи ÑÑом ÑолÑко изменениÑ, оÑноÑÑÑиеÑÑ Ðº опÑеделÑнной ÑÑанзакÑии, декодиÑÑÑÑÑÑ Ð¼ÐµÐ¶Ð´Ñ Ð²Ñзовами обÑабоÑÑиков begin и commit. ТÑанзакÑии, оÑменÑннÑе Ñвно или неÑвно, никогда не декодиÑÑÑÑÑÑ. УÑпеÑнÑе ÑоÑки ÑоÑ
ÑÐ°Ð½ÐµÐ½Ð¸Ñ Ð·Ð°Ð²Ð¾ÑаÑиваÑÑÑÑ Ð² ÑÑанзакÑиÑ, ÑодеÑжаÑÑÑ Ð¸Ñ
, в Ñом поÑÑдке, в како они вÑполнÑлиÑÑ Ð² ÑÑой ÑÑанзакÑии.
ÐÑимеÑание
ÐекодиÑоваÑÑÑÑ Ð±ÑдÑÑ ÑолÑко Ñе ÑÑанзакÑии, коÑоÑÑе Ñже ÑÑпеÑно ÑбÑоÑÐµÐ½Ñ Ð½Ð° диÑк. ÐÑледÑÑвие ÑÑого, COMMIT Ð¼Ð¾Ð¶ÐµÑ Ð½Ðµ декодиÑоваÑÑÑÑ Ð² ÑледÑÑÑем ÑÑÐ°Ð·Ñ Ð·Ð° ним вÑзове pg_logical_slot_get_changes(), когда synchronous_commit Ð¸Ð¼ÐµÐµÑ Ð·Ð½Ð°Ñение off.
46.6.4.1. ÐбÑабоÑÑик запÑÑка
ÐеобÑзаÑелÑнÑй обÑабоÑÑик startup_cb вÑзÑваеÑÑÑ, когда ÑÐ»Ð¾Ñ ÑепликаÑии ÑоздаÑÑÑÑ Ð¸Ð»Ð¸ ÑеÑез него запÑаÑиваеÑÑÑ Ð¿ÐµÑедаÑа изменений, незавиÑимо Ð¾Ñ Ñого, в каком колиÑеÑÑве Ð¸Ð·Ð¼ÐµÐ½ÐµÐ½Ð¸Ñ Ð³Ð¾ÑÐ¾Ð²Ñ Ðº пеÑедаÑе.
typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
OutputPluginOptions *options,
bool is_init); ÐаÑамеÑÑ is_init бÑÐ´ÐµÑ Ñавен true, когда ÑÐ»Ð¾Ñ ÑепликаÑии ÑоздаÑÑÑÑ, и false в пÑоÑивном ÑлÑÑае. ÐаÑамеÑÑ options ÑказÑÐ²Ð°ÐµÑ Ð½Ð° ÑÑÑÑкÑÑÑÑ Ð¿Ð°ÑамеÑÑов, коÑоÑÑе могÑÑ ÑÑÑанавливаÑÑ Ð¼Ð¾Ð´Ñли вÑвода:
typedef struct OutputPluginOptions
{
OutputPluginOutputType output_type;
} OutputPluginOptions; Рполе output_type должно бÑÑÑ Ð·Ð½Ð°Ñение OUTPUT_PLUGIN_TEXTUAL_OUTPUT или OUTPUT_PLUGIN_BINARY_OUTPUT. См. Ñакже ÐодÑаздел 46.6.3.
ÐбÑабоÑÑик запÑÑка должен пÑовеÑиÑÑ Ð¿Ð°ÑамеÑÑÑ, пÑедÑÑавленнÑе в ctx->output_plugin_options. ÐÑли модÑÐ»Ñ Ð²Ñвода ÑÑебÑеÑÑÑ Ð¿Ð¾Ð´Ð´ÐµÑживаÑÑ ÑоÑÑоÑние, он Ð¼Ð¾Ð¶ÐµÑ ÑоÑ
ÑаниÑÑ ÐµÐ³Ð¾ в ctx->output_plugin_private.
46.6.4.2. ÐбÑабоÑÑик вÑклÑÑениÑ
ÐеобÑзаÑелÑнÑй обÑабоÑÑик shutdown_cb вÑзÑваеÑÑÑ, когда Ñанее акÑивнÑй ÑÐ»Ð¾Ñ ÑепликаÑии пеÑеÑÑаÑÑ Ð¸ÑполÑзоваÑÑÑÑ, Ñак ÑÑо ÑеÑÑÑÑÑ, занÑÑÑе модÑлем вÑвода, можно оÑвободиÑÑ. ÐÑи ÑÑом ÑÐ»Ð¾Ñ Ð½Ðµ обÑзаÑелÑно ÑдалÑеÑÑÑ, пÑекÑаÑаеÑÑÑ ÑолÑко поÑÐ¾ÐºÐ¾Ð²Ð°Ñ Ð¿ÐµÑедаÑа ÑеÑез него.
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
46.6.4.3. ÐбÑабоÑÑик наÑала ÑÑанзакÑии
ÐбÑзаÑелÑнÑй обÑабоÑÑик begin_cb вÑзÑваеÑÑÑ, когда декодиÑÑеÑÑÑ Ð½Ð°Ñало заÑикÑиÑованной ÑÑанзакÑии. ÐÑеÑваннÑе ÑÑанзакÑии и иÑ
ÑодеÑжимое никогда не декодиÑÑеÑÑÑ.
typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn); ÐаÑамеÑÑ txn ÑодеÑÐ¶Ð¸Ñ Ð¼ÐµÑаинÑоÑмаÑÐ¸Ñ Ð¾ ÑÑанзакÑии, в ÑаÑÑноÑÑи ÐµÑ Ð¸Ð´ÐµÐ½ÑиÑикаÑÐ¾Ñ Ð¸ вÑÐµÐ¼Ñ ÐµÑ ÑикÑиÑованиÑ.
46.6.4.4. ÐбÑабоÑÑик завеÑÑÐµÐ½Ð¸Ñ ÑÑанзакÑии
ÐбÑзаÑелÑнÑй обÑабоÑÑик commit_cb вÑзÑваеÑÑÑ, когда декодиÑÑеÑÑÑ ÑикÑиÑование ÑÑанзакÑии. ÐеÑед ÑÑим обÑабоÑÑиком бÑÐ´ÐµÑ Ð²ÑзÑваÑÑÑÑ Ð¾Ð±ÑабоÑÑик change_cb Ð´Ð»Ñ Ð²ÑеÑ
изменÑннÑÑ
ÑÑÑок (еÑли ÑÑÑоки бÑли измененÑ).
typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);46.6.4.5. ÐбÑабоÑÑик изменениÑ
ÐбÑзаÑелÑнÑй обÑабоÑÑик change_cb вÑзÑваеÑÑÑ Ð´Ð»Ñ ÐºÐ°Ð¶Ð´Ð¾Ð³Ð¾ оÑделÑного Ð¸Ð·Ð¼ÐµÐ½ÐµÐ½Ð¸Ñ ÑÑÑоки в ÑÑанзакÑии, пÑоизводимого командами INSERT, UPDATE или DELETE. Ðаже еÑли команда изменила неÑколÑко ÑÑÑок ÑÑазÑ, ÑÑÐ¾Ñ Ð¾Ð±ÑабоÑÑик бÑÐ´ÐµÑ Ð²ÑзÑваÑÑÑÑ Ð´Ð»Ñ ÐºÐ°Ð¶Ð´Ð¾Ð¹ оÑделÑной ÑÑÑоки.
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change); ÐаÑамеÑÑÑ ctx и txn имеÑÑ Ñо же ÑодеÑжимое, ÑÑо и Ð´Ð»Ñ Ð¾Ð±ÑабоÑÑиков begin_cb и commit_cb; дополниÑелÑнÑй деÑкÑипÑÐ¾Ñ Ð¾ÑноÑÐµÐ½Ð¸Ñ relation ÑказÑÐ²Ð°ÐµÑ Ð½Ð° оÑноÑение, к коÑоÑÐ¾Ð¼Ñ Ð¿ÑÐ¸Ð½Ð°Ð´Ð»ÐµÐ¶Ð¸Ñ ÑÑÑока, а ÑÑÑÑкÑÑÑа change опиÑÑÐ²Ð°ÐµÑ Ð¿ÐµÑедаваемое изменение ÑÑÑоки.
ÐÑимеÑание
РпÑоÑеÑÑе логиÑеÑкого декодиÑÐ¾Ð²Ð°Ð½Ð¸Ñ Ð¼Ð¾Ð³ÑÑ Ð±ÑÑÑ Ð¾Ð±ÑабоÑÐ°Ð½Ñ Ð¸Ð·Ð¼ÐµÐ½ÐµÐ½Ð¸Ñ ÑолÑко в ÑаблиÑаÑ
, не ÑвлÑÑÑиÑ
ÑÑ Ð½ÐµÐ¶ÑÑналиÑÑемÑми (Ñм. опиÑание UNLOGGED) или вÑеменнÑми (Ñм. опиÑание TEMPORARY или TEMP).
46.6.4.6. ÐбÑабоÑÑик ÑилÑÑÑаÑии иÑÑоÑника
ÐеобÑзаÑелÑнÑй обÑабоÑÑик filter_by_origin_cb вÑзÑваеÑÑÑ, ÑÑÐ¾Ð±Ñ Ð¾ÑмеÑиÑÑ, инÑеÑеÑÑÑÑ Ð»Ð¸ модÑÐ»Ñ Ð²Ñвода изменениÑ, воÑпÑоизводимÑе из Ñказанного иÑÑоÑника (origin_id).
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
RepOriginId origin_id); РпаÑамеÑÑе ctx пеÑедаÑÑÑÑ Ñа же инÑоÑмаÑиÑ, ÑÑо и Ð´Ð»Ñ Ð´ÑÑгиÑ
обÑабоÑÑиков. ЧÑÐ¾Ð±Ñ Ð¾ÑмеÑиÑÑ, ÑÑо изменениÑ, поÑÑÑпаÑÑие из пеÑеданного Ñзла, не пÑедÑÑавлÑÑÑ Ð¸Ð½ÑеÑеÑа, модÑÐ»Ñ Ð´Ð¾Ð»Ð¶ÐµÐ½ веÑнÑÑÑ true, вÑледÑÑвие Ñего ÑÑи Ð¸Ð·Ð¼ÐµÐ½ÐµÐ½Ð¸Ñ Ð±ÑдÑÑ ÑилÑÑÑоваÑÑÑÑ; в пÑоÑивном ÑлÑÑае он должен веÑнÑÑÑ false. ÐÑÑгие обÑабоÑÑики Ð´Ð»Ñ ÑилÑÑÑÑемÑÑ
ÑÑанзакÑий и изменений вÑзÑваÑÑÑÑ Ð½Ðµ бÑдÑÑ.
ÐÑо полезно пÑи ÑеализаÑии каÑкадной или ÑазнонапÑавленной ÑепликаÑии. ФилÑÑÑаÑÐ¸Ñ Ð¿Ð¾ иÑÑоÑÐ½Ð¸ÐºÑ Ð² ÑÐ°ÐºÐ¸Ñ ÐºÐ¾Ð½ÑигÑÑаÑиÑÑ Ð¿Ð¾Ð·Ð²Ð¾Ð»ÑÐµÑ Ð¿ÑедоÑвÑаÑиÑÑ Ð¿ÐµÑедаÑÑ Ð²Ð·Ð°Ð´-впеÑÑд Ð¾Ð´Ð½Ð¸Ñ Ð¸ ÑÐµÑ Ð¶Ðµ изменений. ХоÑÑ Ð¸Ð½ÑоÑмаÑÐ¸Ñ Ð¾Ð± иÑÑоÑнике можно Ñакже извлеÑÑ Ð¸Ð· ÑÑанзакÑий и изменений, ÑилÑÑÑаÑÐ¸Ñ Ñ Ð¿Ð¾Ð¼Ð¾ÑÑÑ ÑÑого обÑабоÑÑика гоÑаздо более ÑÑÑекÑивна.
46.6.4.7. ÐбÑабоÑÑик пÑоизволÑнÑÑ ÑообÑений
ÐеобÑзаÑелÑнÑй обÑабоÑÑик message_cb вÑзÑваеÑÑÑ Ð¿Ñи полÑÑении ÑообÑÐµÐ½Ð¸Ñ Ð»Ð¾Ð³Ð¸ÑеÑкого декодиÑованиÑ.
typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
bool transactional,
const char *prefix,
Size message_size,
const char *message); ÐаÑамеÑÑ txn ÑодеÑÐ¶Ð¸Ñ Ð¼ÐµÑаинÑоÑмаÑÐ¸Ñ Ð¾ ÑÑанзакÑии, вклÑÑÐ°Ñ Ð²ÑÐµÐ¼Ñ ÐµÑ ÑикÑаÑии и ÐµÑ XID. ÐамеÑÑÑе, однако, ÑÑо в нÑм Ð¼Ð¾Ð¶ÐµÑ Ð¿ÐµÑедаваÑÑÑÑ NULL, когда ÑообÑение неÑÑанзакÑионное и ÑÑанзакÑии, в коÑоÑой бÑло вÑдано ÑообÑение, еÑÑ Ð½Ðµ назнаÑен XID. РпаÑамеÑÑе lsn оÑмеÑаеÑÑÑ Ð¿Ð¾Ð·Ð¸ÑÐ¸Ñ ÑообÑÐµÐ½Ð¸Ñ Ð² WAL. ÐаÑамеÑÑ transactional показÑваеÑ, бÑло ли ÑообÑение пеÑедано как ÑÑанзакÑионное. РпаÑамеÑÑе prefix пеÑедаÑÑÑÑ Ð½ÐµÐºÐ¾ÑоÑÑй пÑеÑÐ¸ÐºÑ (завеÑÑаÑÑийÑÑ Ð½ÑлÑм), по коÑоÑÐ¾Ð¼Ñ ÑекÑÑий модÑÐ»Ñ Ð¼Ð¾Ð¶ÐµÑ Ð²ÑделÑÑÑ Ð¸Ð½ÑеÑеÑÑÑÑие его ÑообÑениÑ. РнаконеÑ, паÑамеÑÑ message ÑодеÑÐ¶Ð¸Ñ Ñамо ÑообÑение ÑазмеÑом message_size байÑ.
ÐÐµÐ¾Ð±Ñ Ð¾Ð´Ð¸Ð¼Ð¾ дополниÑелÑно позабоÑиÑÑÑÑ Ð¾ Ñом, ÑÑÐ¾Ð±Ñ Ð¿ÑеÑикÑ, опÑеделÑÑÑий инÑеÑеÑÑÑÑие модÑÐ»Ñ Ð²Ñвода ÑообÑениÑ, бÑл ÑникалÑнÑм. УдаÑнÑм вÑбоÑом обÑÑно бÑÐ´ÐµÑ Ð¸Ð¼Ñ ÑаÑÑиÑÐµÐ½Ð¸Ñ Ð¸Ð»Ð¸ Ñамого модÑÐ»Ñ Ð²Ñвода.
46.6.5. ФÑнкÑии Ð´Ð»Ñ ÑоÑмиÑÐ¾Ð²Ð°Ð½Ð¸Ñ Ð²Ñвода
ЧÑÐ¾Ð±Ñ Ð´ÐµÐ¹ÑÑвиÑелÑно вÑвеÑÑи даннÑе, модÑли вÑвода могÑÑ Ð·Ð°Ð¿Ð¸ÑÑваÑÑ Ð¸Ñ
в бÑÑÐµÑ StringInfo ÑеÑез ctx->out, внÑÑÑи обÑабоÑÑиков begin_cb, commit_cb или change_cb. ÐÑежде Ñем запиÑÑваÑÑ Ð´Ð°Ð½Ð½Ñе в ÑÑÐ¾Ñ Ð±ÑÑеÑ, необÑ
одимо вÑзваÑÑ OutputPluginPrepareWrite(ctx, last_write), а завеÑÑив запиÑÑ Ð² бÑÑеÑ, нÑжно вÑзваÑÑ OutputPluginWrite(ctx, last_write), ÑÑÐ¾Ð±Ñ ÑобÑÑвенно пÑоизвеÑÑи запиÑÑ. ÐаÑамеÑÑ last_write ÑказÑваеÑ, бÑла ли ÑÑа опÑеделÑÐ½Ð½Ð°Ñ Ð¾Ð¿ÐµÑаÑÐ¸Ñ Ð·Ð°Ð¿Ð¸Ñи поÑледней в данном обÑабоÑÑике.
СледÑÑÑий пÑÐ¸Ð¼ÐµÑ Ð¿Ð¾ÐºÐ°Ð·ÑваеÑ, как вÑвеÑÑи даннÑе Ð´Ð»Ñ Ð¿Ð¾ÑÑебиÑÐµÐ»Ñ Ð¼Ð¾Ð´ÑÐ»Ñ Ð²Ñвода:
OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "BEGIN %u", txn->xid); OutputPluginWrite(ctx, true);