This file is indexed.

/usr/share/postgresql/9.3/contrib/londiste.sql is in skytools-modules-9.3 2.1.13-4.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

   1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
set default_with_oids = 'off';

create schema londiste;

create table londiste.provider_table (
    nr                  serial not null,
    queue_name          text not null,
    table_name          text not null,
    trigger_name        text,
    primary key (queue_name, table_name)
);

create table londiste.provider_seq (
    nr                  serial not null,
    queue_name          text not null,
    seq_name            text not null,
    primary key (queue_name, seq_name)
);

create table londiste.completed (
    consumer_id     text not null,
    last_tick_id    bigint not null,

    primary key (consumer_id)
);

create table londiste.link (
    source    text not null,
    dest      text not null,
    primary key (source),
    unique (dest)
);

create table londiste.subscriber_table (
    nr                  serial not null,
    queue_name          text not null,
    table_name          text not null,
    snapshot            text,
    merge_state         text,
    trigger_name        text,

    skip_truncate       bool,

    primary key (queue_name, table_name)
);

create table londiste.subscriber_seq (
    nr                  serial not null,
    queue_name          text not null,
    seq_name            text not null,

    primary key (queue_name, seq_name)
);

create table londiste.subscriber_pending_fkeys (
    from_table          text not null,
    to_table            text not null,
    fkey_name           text not null,
    fkey_def            text not null,
    
    primary key (from_table, fkey_name)
);

create table londiste.subscriber_pending_triggers (
    table_name          text not null,
    trigger_name        text not null,
    trigger_def         text not null,
    
    primary key (table_name, trigger_name)
);

grant usage on schema londiste to public;
grant select on londiste.provider_table to public;
grant select on londiste.completed to public;
grant select on londiste.link to public;
grant select on londiste.subscriber_table to public;


create type londiste.ret_provider_table_list as (
    table_name text,
    trigger_name text
);

create type londiste.ret_subscriber_table as (
    table_name text,
    merge_state text,
    snapshot text,
    trigger_name text,
    skip_truncate bool
);

create or replace function londiste.find_column_types(tbl text)
returns text as $$
declare
    res      text;
    col      record;
    tbl_oid  oid;
begin
    tbl_oid := londiste.find_table_oid(tbl);
    res := '';
    for col in 
        SELECT CASE WHEN k.attname IS NOT NULL THEN 'k' ELSE 'v' END AS type
            FROM pg_attribute a LEFT JOIN (
                SELECT k.attname FROM pg_index i, pg_attribute k
                 WHERE i.indrelid = tbl_oid AND k.attrelid = i.indexrelid
                   AND i.indisprimary AND k.attnum > 0 AND NOT k.attisdropped
                ) k ON (k.attname = a.attname)
            WHERE a.attrelid = tbl_oid AND a.attnum > 0 AND NOT a.attisdropped
            ORDER BY a.attnum
    loop
        res := res || col.type;
    end loop;

    return res;
end;
$$ language plpgsql strict stable;


create or replace function londiste.find_table_fkeys(i_table_name text)
returns setof londiste.subscriber_pending_fkeys as $$
declare
    fkey      record;
    tbl_oid   oid;
begin
    select londiste.find_table_oid(i_table_name) into tbl_oid;
        
    for fkey in
        select n1.nspname || '.' || t1.relname as from_table, n2.nspname || '.' || t2.relname as to_table,
            conname::text as fkey_name, 
            'alter table only ' || quote_ident(n1.nspname) || '.' || quote_ident(t1.relname)
            || ' add constraint ' || quote_ident(conname::text) || ' ' || pg_get_constraintdef(c.oid)
            as fkey_def
        from pg_constraint c, pg_namespace n1, pg_class t1, pg_namespace n2, pg_class t2
        where c.contype = 'f' and (c.conrelid = tbl_oid or c.confrelid = tbl_oid)
            and t1.oid = c.conrelid and n1.oid = t1.relnamespace
            and t2.oid = c.confrelid and n2.oid = t2.relnamespace
        order by 1,2,3
    loop
        return next fkey;
    end loop;
    
    return;
end;
$$ language plpgsql strict stable;


create or replace function londiste.find_rel_oid(tbl text, kind text)
returns oid as $$
declare
    res      oid;
    pos      integer;
    schema   text;
    name     text;
begin
    pos := position('.' in tbl);
    if pos > 0 then
        schema := substring(tbl for pos - 1);
        name := substring(tbl from pos + 1);
    else
        schema := 'public';
        name := tbl;
    end if;
    select c.oid into res
      from pg_namespace n, pg_class c
     where c.relnamespace = n.oid
       and c.relkind = kind
       and n.nspname = schema and c.relname = name;
    if not found then
        if kind = 'r' then
            raise exception 'table not found';
        elsif kind = 'S' then
            raise exception 'seq not found';
        else
            raise exception 'weird relkind';
        end if;
    end if;

    return res;
end;
$$ language plpgsql strict stable;

create or replace function londiste.find_table_oid(tbl text)
returns oid as $$
begin
    return londiste.find_rel_oid(tbl, 'r');
end;
$$ language plpgsql strict stable;

create or replace function londiste.find_seq_oid(tbl text)
returns oid as $$
begin
    return londiste.find_rel_oid(tbl, 'S');
end;
$$ language plpgsql strict stable;


create or replace function londiste.find_table_triggers(i_table_name text)
returns setof londiste.subscriber_pending_triggers as $$
declare
    tg        record;
    ver       int4;
begin
    select setting::int4 into ver from pg_settings
     where name = 'server_version_num';

    if ver >= 90000 then
        for tg in
            select n.nspname || '.' || c.relname as table_name, t.tgname::text as name, pg_get_triggerdef(t.oid) as def 
            from pg_trigger t, pg_class c, pg_namespace n
            where n.oid = c.relnamespace and c.oid = t.tgrelid
                and t.tgrelid = londiste.find_table_oid(i_table_name)
                and not t.tgisinternal
        loop
            return next tg;
        end loop;
    else
        for tg in
            select n.nspname || '.' || c.relname as table_name, t.tgname::text as name, pg_get_triggerdef(t.oid) as def 
            from pg_trigger t, pg_class c, pg_namespace n
            where n.oid = c.relnamespace and c.oid = t.tgrelid
                and t.tgrelid = londiste.find_table_oid(i_table_name)
                and not t.tgisconstraint
        loop
            return next tg;
        end loop;
    end if;
    
    return;
end;
$$ language plpgsql strict stable;

create or replace function londiste.get_last_tick(i_consumer text)
returns bigint as $$
declare
    res   bigint;
begin
    select last_tick_id into res
      from londiste.completed
     where consumer_id = i_consumer;
    return res;
end;
$$ language plpgsql security definer strict stable;


create or replace function londiste.link_source(i_dst_name text)
returns text as $$
declare
    res  text;
begin
    select source into res from londiste.link
     where dest = i_dst_name;
    return res;
end;
$$ language plpgsql security definer;

create or replace function londiste.link_dest(i_source_name text)
returns text as $$
declare
    res  text;
begin
    select dest into res from londiste.link
     where source = i_source_name;
    return res;
end;
$$ language plpgsql security definer;

create or replace function londiste.cmp_list(list1 text, a_queue text, a_table text, a_field text)
returns boolean as $$
declare
    sql   text;
    tmp   record;
    list2 text;
begin
    sql := 'select ' || quote_ident(a_field) || ' as name from ' || londiste.quote_fqname(a_table)
        || ' where queue_name = ' || quote_literal(a_queue)
        || ' order by 1';
    list2 := '';
    for tmp in execute sql loop
        if list2 = '' then
            list2 := tmp.name;
        else
            list2 := list2 || ',' || tmp.name;
        end if;
    end loop;
    return list1 = list2;
end;
$$ language plpgsql security definer;

create or replace function londiste.link(i_source_name text, i_dest_name text, prov_tick_id bigint, prov_tbl_list text, prov_seq_list text)
returns text as $$
declare
    tmp  text;
    list text;
    tick_seq text;
    external boolean;
    last_tick bigint;
begin
    -- check if all matches
    if not londiste.cmp_list(prov_tbl_list, i_source_name,
                             'londiste.subscriber_table', 'table_name')
    then
        raise exception 'not all tables copied into subscriber';
    end if;
    if not londiste.cmp_list(prov_seq_list, i_source_name,
                             'londiste.subscriber_seq', 'seq_name')
    then
        raise exception 'not all seqs copied into subscriber';
    end if;
    if not londiste.cmp_list(prov_seq_list, i_dest_name,
                             'londiste.provider_table', 'table_name')
    then
        raise exception 'linked provider queue does not have all tables';
    end if;
    if not londiste.cmp_list(prov_seq_list, i_dest_name,
                             'londiste.provider_seq', 'seq_name')
    then
        raise exception 'linked provider queue does not have all seqs';
    end if;

    -- check pgq
    select queue_external_ticker, queue_tick_seq into external, tick_seq
        from pgq.queue where queue_name = i_dest_name;
    if not found then
        raise exception 'dest queue does not exist';
    end if;
    if external then
        raise exception 'dest queue has already external_ticker turned on?';
    end if;

    if nextval(tick_seq) >= prov_tick_id then
        raise exception 'dest queue ticks larger';
    end if;
    
    update pgq.queue set queue_external_ticker = true
        where queue_name = i_dest_name;

    insert into londiste.link (source, dest) values (i_source_name, i_dest_name);

    return null;
end;
$$ language plpgsql security definer;

create or replace function londiste.link_del(i_source_name text, i_dest_name text)
returns text as $$
begin
    delete from londiste.link
     where source = i_source_name
       and dest = i_dest_name;
    if not found then
        raise exception 'no suck link';
    end if;
    return null;
end;
$$ language plpgsql security definer;


create or replace function londiste.provider_add_seq(
    i_queue_name text, i_seq_name text)
returns integer as $$
declare
    link text;
begin
    -- check if linked queue
    link := londiste.link_source(i_queue_name);
    if link is not null then
        raise exception 'Linked queue, cannot modify';
    end if;

    perform 1 from pg_class
        where oid = londiste.find_seq_oid(i_seq_name);
    if not found then
        raise exception 'seq not found';
    end if;

    insert into londiste.provider_seq (queue_name, seq_name)
        values (i_queue_name, i_seq_name);

    return 0;
end;
$$ language plpgsql security definer;

create or replace function londiste.provider_add_table(
    i_queue_name    text,
    i_table_name    text,
    i_col_types     text
) returns integer strict as $$
declare
    tgname text;
    sql    text;
begin
    if londiste.link_source(i_queue_name) is not null then
        raise exception 'Linked queue, manipulation not allowed';
    end if;

    if position('k' in i_col_types) < 1 then
        raise exception 'need key column';
    end if;
    if position('.' in i_table_name) < 1 then
        raise exception 'need fully-qualified table name';
    end if;
    select queue_name into tgname
        from pgq.queue where queue_name = i_queue_name;
    if not found then
        raise exception 'no such event queue';
    end if;

    tgname := i_queue_name || '_logger';
    tgname := replace(lower(tgname), '.', '_');
    insert into londiste.provider_table
        (queue_name, table_name, trigger_name)
        values (i_queue_name, i_table_name, tgname);

    perform londiste.provider_create_trigger(
        i_queue_name, i_table_name, i_col_types);

    return 1;
end;
$$ language plpgsql security definer;

create or replace function londiste.provider_add_table(
    i_queue_name text,
    i_table_name text
) returns integer as $$
begin
    return londiste.provider_add_table(i_queue_name, i_table_name,
        londiste.find_column_types(i_table_name));
end;
$$ language plpgsql security definer;


create or replace function londiste.provider_create_trigger(
    i_queue_name    text,
    i_table_name    text,
    i_col_types     text
) returns integer strict as $$
declare
    tgname text;
begin
    select trigger_name into tgname
        from londiste.provider_table
        where queue_name = i_queue_name
          and table_name = i_table_name;
    if not found then
        raise exception 'table not found';
    end if;

    execute 'create trigger ' || quote_ident(tgname)
        || ' after insert or update or delete on '
        || londiste.quote_fqname(i_table_name)
        || ' for each row execute procedure pgq.logtriga('
        || quote_literal(i_queue_name) || ', '
        || quote_literal(i_col_types) || ', '
        || quote_literal(i_table_name) || ')';

    return 1;
end;
$$ language plpgsql security definer;


create or replace function londiste.provider_get_seq_list(i_queue_name text)
returns setof text as $$
declare
    rec record;
begin
    for rec in
        select seq_name from londiste.provider_seq
            where queue_name = i_queue_name
            order by nr
    loop
        return next rec.seq_name;
    end loop;
    return;
end;
$$ language plpgsql security definer;


create or replace function londiste.provider_get_table_list(i_queue text)
returns setof londiste.ret_provider_table_list as $$ 
declare 
    rec   londiste.ret_provider_table_list%rowtype; 
begin 
    for rec in 
        select table_name, trigger_name
            from londiste.provider_table
            where queue_name = i_queue
            order by nr
    loop
        return next rec;
    end loop; 
    return;
end; 
$$ language plpgsql security definer;


create or replace function londiste.provider_notify_change(i_queue_name text)
returns integer as $$
declare
    res      text;
    tbl      record;
begin
    res := '';
    for tbl in
        select table_name from londiste.provider_table
            where queue_name = i_queue_name
            order by nr
    loop
        if res = '' then
            res := tbl.table_name;
        else
            res := res || ',' || tbl.table_name;
        end if;
    end loop;
    
    perform pgq.insert_event(i_queue_name, 'T', res);

    return 1;
end;
$$ language plpgsql security definer;


create or replace function londiste.provider_refresh_trigger(
    i_queue_name    text,
    i_table_name    text,
    i_col_types     text
) returns integer strict as $$
declare
    t_name   text;
    tbl_oid  oid;
begin
    select trigger_name into t_name
        from londiste.provider_table
        where queue_name = i_queue_name
          and table_name = i_table_name;
    if not found then
        raise exception 'table not found';
    end if;

    tbl_oid := londiste.find_table_oid(i_table_name);
    perform 1 from pg_trigger
        where tgrelid = tbl_oid
          and tgname = t_name;
    if found then
        execute 'drop trigger ' || quote_ident(t_name)
            || ' on ' || londiste.quote_fqname(i_table_name);
    end if;

    perform londiste.provider_create_trigger(i_queue_name, i_table_name, i_col_types);

    return 1;
end;
$$ language plpgsql security definer;

create or replace function londiste.provider_refresh_trigger(
    i_queue_name    text,
    i_table_name    text
) returns integer strict as $$
begin
    return londiste.provider_refresh_trigger(i_queue_name, i_table_name,
                            londiste.find_column_types(i_table_name));
end;
$$ language plpgsql security definer;




create or replace function londiste.provider_remove_seq(
    i_queue_name text, i_seq_name text)
returns integer as $$
declare
    link text;
begin
    -- check if linked queue
    link := londiste.link_source(i_queue_name);
    if link is not null then
        raise exception 'Linked queue, cannot modify';
    end if;

    delete from londiste.provider_seq
        where queue_name = i_queue_name
          and seq_name = i_seq_name;
    if not found then
        raise exception 'seq not attached';
    end if;

    perform londiste.provider_notify_change(i_queue_name);

    return 0;
end;
$$ language plpgsql security definer;


create or replace function londiste.provider_remove_table(
    i_queue_name   text,
    i_table_name   text
) returns integer as $$
declare
    tgname text;
begin
    if londiste.link_source(i_queue_name) is not null then
        raise exception 'Linked queue, manipulation not allowed';
    end if;

    select trigger_name into tgname from londiste.provider_table
        where queue_name = i_queue_name
          and table_name = i_table_name;
    if not found then
        raise exception 'no such table registered';
    end if;

    begin
        execute 'drop trigger ' || quote_ident(tgname) || ' on ' || londiste.quote_fqname(i_table_name);
    exception
        when undefined_table then
            raise notice 'table % does not exist', i_table_name;
        when undefined_object then
            raise notice 'trigger % does not exist on table %', tgname, i_table_name;
    end;

    delete from londiste.provider_table
        where queue_name = i_queue_name
          and table_name = i_table_name;

    return 1;
end;
$$ language plpgsql security definer;



create or replace function londiste.quote_fqname(i_name text)
returns text as $$
declare
    res     text;
    pos     integer;
    s       text;
    n       text;
begin
    pos := position('.' in i_name);
    if pos > 0 then
        s := substring(i_name for pos - 1);
        n := substring(i_name from pos + 1);
    else
        s := 'public';
        n := i_name;
    end if;
    return quote_ident(s) || '.' || quote_ident(n);
end;
$$ language plpgsql strict immutable;


create or replace function londiste.set_last_tick(
    i_consumer text,
    i_tick_id bigint)
returns integer as $$
begin
    if i_tick_id is null then
        delete from londiste.completed
         where consumer_id = i_consumer;
    else
        update londiste.completed
           set last_tick_id = i_tick_id
         where consumer_id = i_consumer;
        if not found then
            insert into londiste.completed (consumer_id, last_tick_id)
                values (i_consumer, i_tick_id);
        end if;
    end if;

    return 1;
end;
$$ language plpgsql security definer;


create or replace function londiste.subscriber_add_seq(
    i_queue_name text, i_seq_name text)
returns integer as $$
declare
    link text;
begin
    insert into londiste.subscriber_seq (queue_name, seq_name)
        values (i_queue_name, i_seq_name);

    -- update linked queue if needed
    link := londiste.link_dest(i_queue_name);
    if link is not null then
        insert into londiste.provider_seq
            (queue_name, seq_name)
        values (link, i_seq_name);
        perform londiste.provider_notify_change(link);
    end if;

    return 0;
end;
$$ language plpgsql security definer;


create or replace function londiste.subscriber_add_table(
    i_queue_name text, i_table text)
returns integer as $$
begin
    insert into londiste.subscriber_table (queue_name, table_name)
        values (i_queue_name, i_table);

    -- linked queue is updated, when the table is copied

    return 0;
end;
$$ language plpgsql security definer;



create or replace function londiste.subscriber_get_table_pending_fkeys(i_table_name text) 
returns setof londiste.subscriber_pending_fkeys as $$
declare
    fkeys   record;
begin
    for fkeys in
        select *
        from londiste.subscriber_pending_fkeys
        where from_table=i_table_name or to_table=i_table_name
        order by 1,2,3
    loop
        return next fkeys;
    end loop;
    
    return;
end;
$$ language plpgsql;


create or replace function londiste.subscriber_get_queue_valid_pending_fkeys(i_queue_name text)
returns setof londiste.subscriber_pending_fkeys as $$
declare
    fkeys   record;
begin
    for fkeys in
        select pf.* from londiste.subscriber_pending_fkeys pf
            join londiste.subscriber_table st_from 
                on (st_from.table_name = pf.from_table and st_from.merge_state = 'ok' and st_from.snapshot is null)
            join londiste.subscriber_table st_to   
                on (st_to.table_name = pf.to_table and st_to.merge_state = 'ok' and st_to.snapshot is null)
            -- change the AND to OR to allow fkeys between tables coming from different queues
            where (st_from.queue_name = i_queue_name and st_to.queue_name = i_queue_name)
            order by 1, 2, 3
    loop
        return next fkeys;
    end loop;
    
    return;
end;
$$ language plpgsql;


create or replace function londiste.subscriber_drop_table_fkey(i_from_table text, i_fkey_name text)
returns integer as $$
declare
    fkey       record;
begin        
    select * into fkey
    from londiste.find_table_fkeys(i_from_table) 
    where fkey_name = i_fkey_name and from_table = i_from_table;
    
    if not found then
        return 0;
    end if;
            
    insert into londiste.subscriber_pending_fkeys values (fkey.from_table, fkey.to_table, i_fkey_name, fkey.fkey_def);
        
    execute 'alter table only ' || londiste.quote_fqname(fkey.from_table)
            || ' drop constraint ' || quote_ident(i_fkey_name);
    
    return 1;
end;
$$ language plpgsql;


create or replace function londiste.subscriber_restore_table_fkey(i_from_table text, i_fkey_name text)
returns integer as $$
declare
    fkey    record;
begin
    select * into fkey
    from londiste.subscriber_pending_fkeys 
    where fkey_name = i_fkey_name and from_table = i_from_table;
    
    if not found then
        return 0;
    end if;
    
    delete from londiste.subscriber_pending_fkeys where fkey_name = fkey.fkey_name;
        
    execute fkey.fkey_def;
        
    return 1;
end;
$$ language plpgsql;

create or replace function londiste.subscriber_get_seq_list(i_queue_name text)
returns setof text as $$
declare
    rec record;
begin
    for rec in
        select seq_name from londiste.subscriber_seq
            where queue_name = i_queue_name
            order by nr
    loop
        return next rec.seq_name;
    end loop;
    return;
end;
$$ language plpgsql security definer;


create or replace function londiste.subscriber_get_table_list(i_queue_name text)
returns setof londiste.ret_subscriber_table as $$
declare
    rec londiste.ret_subscriber_table%rowtype;
begin
    for rec in
        select table_name, merge_state, snapshot, trigger_name, skip_truncate
          from londiste.subscriber_table
         where queue_name = i_queue_name
         order by nr
    loop
        return next rec;
    end loop;
    return;
end;
$$ language plpgsql security definer;

-- compat
create or replace function londiste.get_table_state(i_queue text)
returns setof londiste.subscriber_table as $$
declare
    rec londiste.subscriber_table%rowtype;
begin
    for rec in
        select * from londiste.subscriber_table
            where queue_name = i_queue
            order by nr
    loop
        return next rec;
    end loop;
    return;
end;
$$ language plpgsql security definer;


create or replace function londiste.subscriber_remove_seq(
    i_queue_name text, i_seq_name text)
returns integer as $$
declare
    link text;
begin
    delete from londiste.subscriber_seq
        where queue_name = i_queue_name
          and seq_name = i_seq_name;
    if not found then
        raise exception 'no such seq?';
    end if;

    -- update linked queue if needed
    link := londiste.link_dest(i_queue_name);
    if link is not null then
        delete from londiste.provider_seq
         where queue_name = link
           and seq_name = i_seq_name;
        perform londiste.provider_notify_change(link);
    end if;

    return 0;
end;
$$ language plpgsql security definer;


create or replace function londiste.subscriber_remove_table(
    i_queue_name text, i_table text)
returns integer as $$
declare
    link  text;
begin
    delete from londiste.subscriber_table
     where queue_name = i_queue_name
       and table_name = i_table;
    if not found then
        raise exception 'no such table';
    end if;

    -- sync link
    link := londiste.link_dest(i_queue_name);
    if link is not null then
        delete from londiste.provider_table
            where queue_name = link
              and table_name = i_table;
        perform londiste.provider_notify_change(link);
    end if;

    return 0;
end;
$$ language plpgsql security definer;


create or replace function londiste.subscriber_set_skip_truncate(
    i_queue text,
    i_table text,
    i_value bool)
returns integer as $$
begin
    update londiste.subscriber_table
       set skip_truncate = i_value
     where queue_name = i_queue
       and table_name = i_table;
    if not found then
        raise exception 'table not found';
    end if;

    return 1;
end;
$$ language plpgsql security definer;


create or replace function londiste.subscriber_set_table_state(
    i_queue_name text,
    i_table_name text,
    i_snapshot text,
    i_merge_state text)
returns integer as $$
declare
    link  text;
    ok    integer;
begin
    update londiste.subscriber_table
        set snapshot = i_snapshot,
            merge_state = i_merge_state,
            -- reset skip_snapshot when table is copied over
            skip_truncate = case when i_merge_state = 'ok'
                                 then null
                                 else skip_truncate
                            end
      where queue_name = i_queue_name
        and table_name = i_table_name;
    if not found then
        raise exception 'no such table';
    end if;

    -- sync link state also
    link := londiste.link_dest(i_queue_name);
    if link then
        select * from londiste.provider_table
            where queue_name = linkdst
              and table_name = i_table_name;
        if found then
            if i_merge_state is null or i_merge_state <> 'ok' then
                delete from londiste.provider_table
                 where queue_name = link
                   and table_name = i_table_name;
                perform londiste.notify_change(link);
            end if;
        else
            if i_merge_state = 'ok' then
                insert into londiste.provider_table (queue_name, table_name)
                    values (link, i_table_name);
                perform londiste.notify_change(link);
            end if;
        end if;
    end if;

    return 1;
end;
$$ language plpgsql security definer;

create or replace function londiste.set_table_state(
    i_queue_name text,
    i_table_name text,
    i_snapshot text,
    i_merge_state text)
returns integer as $$
begin
    return londiste.subscriber_set_table_state(i_queue_name, i_table_name, i_snapshot, i_merge_state);
end;
$$ language plpgsql security definer;



create or replace function londiste.subscriber_get_table_pending_triggers(i_table_name text)
returns setof londiste.subscriber_pending_triggers as $$
declare
    trigger    record;
begin
    for trigger in
        select *
        from londiste.subscriber_pending_triggers
        where table_name = i_table_name
    loop
        return next trigger;
    end loop;
    
    return;
end;
$$ language plpgsql strict stable;


create or replace function londiste.subscriber_drop_table_trigger(i_table_name text, i_trigger_name text)
returns integer as $$
declare
    trig_def record;
begin
    select * into trig_def
    from londiste.find_table_triggers(i_table_name)
    where trigger_name = i_trigger_name;
    
    if FOUND is not true then
        return 0;
    end if;
    
    insert into londiste.subscriber_pending_triggers(table_name, trigger_name, trigger_def) 
        values (i_table_name, i_trigger_name, trig_def.trigger_def);
    
    execute 'drop trigger ' || quote_ident(i_trigger_name)
        || ' on ' || londiste.quote_fqname(i_table_name);
    
    return 1;
end;
$$ language plpgsql;


create or replace function londiste.subscriber_drop_all_table_triggers(i_table_name text)
returns integer as $$
declare
    trigger record;
begin
    for trigger in
        select trigger_name as name
        from londiste.find_table_triggers(i_table_name)
    loop
        perform londiste.subscriber_drop_table_trigger(i_table_name, trigger.name);
    end loop;
    
    return 1;
end;
$$ language plpgsql;


create or replace function londiste.subscriber_restore_table_trigger(i_table_name text, i_trigger_name text)
returns integer as $$
declare
    trig_def text;
begin
    select trigger_def into trig_def
    from londiste.subscriber_pending_triggers
    where (table_name, trigger_name) = (i_table_name, i_trigger_name);
    
    if not found then
        return 0;
    end if;
    
    delete from londiste.subscriber_pending_triggers 
    where table_name = i_table_name and trigger_name = i_trigger_name;
    
    execute trig_def;

    return 1;
end;
$$ language plpgsql;


create or replace function londiste.subscriber_restore_all_table_triggers(i_table_name text)
returns integer as $$
declare
    trigger record;
begin
    for trigger in
        select trigger_name as name
        from londiste.subscriber_get_table_pending_triggers(i_table_name)
    loop
        perform londiste.subscriber_restore_table_trigger(i_table_name, trigger.name);
    end loop;
    
    return 1;
end;
$$ language plpgsql;



create or replace function londiste.version()
returns text as $$
begin
    return '2.1.12';
end;
$$ language plpgsql;