This file is indexed.

/usr/lib/python2.7/dist-packages/celery/canvas.py is in python-celery 4.1.0-2ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

   1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
# -*- coding: utf-8 -*-
"""Composing task work-flows.

.. seealso:

    You should import these from :mod:`celery` and not this module.
"""
from __future__ import absolute_import, unicode_literals

import itertools
import operator
import sys

from collections import MutableSequence, deque
from copy import deepcopy
from functools import partial as _partial, reduce
from operator import itemgetter

from kombu.utils.functional import fxrange, reprcall
from kombu.utils.objects import cached_property
from kombu.utils.uuid import uuid
from vine import barrier

from celery._state import current_app
from celery.five import python_2_unicode_compatible
from celery.local import try_import
from celery.result import GroupResult
from celery.utils import abstract
from celery.utils.functional import (
    maybe_list, is_list, _regen, regen, chunks as _chunks,
    seq_concat_seq, seq_concat_item,
)
from celery.utils.objects import getitem_property
from celery.utils.text import truncate, remove_repeating_from_task

__all__ = [
    'Signature', 'chain', 'xmap', 'xstarmap', 'chunks',
    'group', 'chord', 'signature', 'maybe_signature',
]

PY3 = sys.version_info[0] == 3

# json in Python 2.7 borks if dict contains byte keys.
JSON_NEEDS_UNICODE_KEYS = PY3 and not try_import('simplejson')


def maybe_unroll_group(g):
    """Unroll group with only one member."""
    # Issue #1656
    try:
        size = len(g.tasks)
    except TypeError:
        try:
            size = g.tasks.__length_hint__()
        except (AttributeError, TypeError):
            return g
        else:
            return list(g.tasks)[0] if size == 1 else g
    else:
        return g.tasks[0] if size == 1 else g


def task_name_from(task):
    return getattr(task, 'name', task)


def _upgrade(fields, sig):
    """Used by custom signatures in .from_dict, to keep common fields."""
    sig.update(chord_size=fields.get('chord_size'))
    return sig


@abstract.CallableSignature.register
@python_2_unicode_compatible
class Signature(dict):
    """Task Signature.

    Class that wraps the arguments and execution options
    for a single task invocation.

    Used as the parts in a :class:`group` and other constructs,
    or to pass tasks around as callbacks while being compatible
    with serializers with a strict type subset.

    Signatures can also be created from tasks:

    - Using the ``.signature()`` method that has the same signature
      as ``Task.apply_async``:

        .. code-block:: pycon

            >>> add.signature(args=(1,), kwargs={'kw': 2}, options={})

    - or the ``.s()`` shortcut that works for star arguments:

        .. code-block:: pycon

            >>> add.s(1, kw=2)

    - the ``.s()`` shortcut does not allow you to specify execution options
      but there's a chaning `.set` method that returns the signature:

        .. code-block:: pycon

            >>> add.s(2, 2).set(countdown=10).set(expires=30).delay()

    Note:
        You should use :func:`~celery.signature` to create new signatures.
        The ``Signature`` class is the type returned by that function and
        should be used for ``isinstance`` checks for signatures.

    See Also:
        :ref:`guide-canvas` for the complete guide.

    Arguments:
        task (Task, str): Either a task class/instance, or the name of a task.
        args (Tuple): Positional arguments to apply.
        kwargs (Dict): Keyword arguments to apply.
        options (Dict): Additional options to :meth:`Task.apply_async`.

    Note:
        If the first argument is a :class:`dict`, the other
        arguments will be ignored and the values in the dict will be used
        instead::

            >>> s = signature('tasks.add', args=(2, 2))
            >>> signature(s)
            {'task': 'tasks.add', args=(2, 2), kwargs={}, options={}}
    """

    TYPES = {}
    _app = _type = None

    @classmethod
    def register_type(cls, name=None):
        def _inner(subclass):
            cls.TYPES[name or subclass.__name__] = subclass
            return subclass
        return _inner

    @classmethod
    def from_dict(cls, d, app=None):
        typ = d.get('subtask_type')
        if typ:
            target_cls = cls.TYPES[typ]
            if target_cls is not cls:
                return target_cls.from_dict(d, app=app)
        return Signature(d, app=app)

    def __init__(self, task=None, args=None, kwargs=None, options=None,
                 type=None, subtask_type=None, immutable=False,
                 app=None, **ex):
        self._app = app

        if isinstance(task, dict):
            super(Signature, self).__init__(task)  # works like dict(d)
        else:
            # Also supports using task class/instance instead of string name.
            try:
                task_name = task.name
            except AttributeError:
                task_name = task
            else:
                self._type = task

            super(Signature, self).__init__(
                task=task_name, args=tuple(args or ()),
                kwargs=kwargs or {},
                options=dict(options or {}, **ex),
                subtask_type=subtask_type,
                immutable=immutable,
                chord_size=None,
            )

    def __call__(self, *partial_args, **partial_kwargs):
        """Call the task directly (in the current process)."""
        args, kwargs, _ = self._merge(partial_args, partial_kwargs, None)
        return self.type(*args, **kwargs)

    def delay(self, *partial_args, **partial_kwargs):
        """Shortcut to :meth:`apply_async` using star arguments."""
        return self.apply_async(partial_args, partial_kwargs)

    def apply(self, args=(), kwargs={}, **options):
        """Call task locally.

        Same as :meth:`apply_async` but executed the task inline instead
        of sending a task message.
        """
        # For callbacks: extra args are prepended to the stored args.
        args, kwargs, options = self._merge(args, kwargs, options)
        return self.type.apply(args, kwargs, **options)

    def apply_async(self, args=(), kwargs={}, route_name=None, **options):
        """Apply this task asynchronously.

        Arguments:
            args (Tuple): Partial args to be prepended to the existing args.
            kwargs (Dict): Partial kwargs to be merged with existing kwargs.
            options (Dict): Partial options to be merged
                with existing options.

        Returns:
            ~@AsyncResult: promise of future evaluation.

        See also:
            :meth:`~@Task.apply_async` and the :ref:`guide-calling` guide.
        """
        try:
            _apply = self._apply_async
        except IndexError:  # pragma: no cover
            # no tasks for chain, etc to find type
            return
        # For callbacks: extra args are prepended to the stored args.
        if args or kwargs or options:
            args, kwargs, options = self._merge(args, kwargs, options)
        else:
            args, kwargs, options = self.args, self.kwargs, self.options
        # pylint: disable=too-many-function-args
        #   Borks on this, as it's a property
        return _apply(args, kwargs, **options)

    def _merge(self, args=(), kwargs={}, options={}, force=False):
        if self.immutable and not force:
            return (self.args, self.kwargs,
                    dict(self.options, **options) if options else self.options)
        return (tuple(args) + tuple(self.args) if args else self.args,
                dict(self.kwargs, **kwargs) if kwargs else self.kwargs,
                dict(self.options, **options) if options else self.options)

    def clone(self, args=(), kwargs={}, **opts):
        """Create a copy of this signature.

        Arguments:
            args (Tuple): Partial args to be prepended to the existing args.
            kwargs (Dict): Partial kwargs to be merged with existing kwargs.
            options (Dict): Partial options to be merged with
                existing options.
        """
        # need to deepcopy options so origins links etc. is not modified.
        if args or kwargs or opts:
            args, kwargs, opts = self._merge(args, kwargs, opts)
        else:
            args, kwargs, opts = self.args, self.kwargs, self.options
        s = Signature.from_dict({'task': self.task, 'args': tuple(args),
                                 'kwargs': kwargs, 'options': deepcopy(opts),
                                 'subtask_type': self.subtask_type,
                                 'chord_size': self.chord_size,
                                 'immutable': self.immutable}, app=self._app)
        s._type = self._type
        return s
    partial = clone

    def freeze(self, _id=None, group_id=None, chord=None,
               root_id=None, parent_id=None):
        """Finalize the signature by adding a concrete task id.

        The task won't be called and you shouldn't call the signature
        twice after freezing it as that'll result in two task messages
        using the same task id.

        Returns:
            ~@AsyncResult: promise of future evaluation.
        """
        # pylint: disable=redefined-outer-name
        #   XXX chord is also a class in outer scope.
        opts = self.options
        try:
            tid = opts['task_id']
        except KeyError:
            tid = opts['task_id'] = _id or uuid()
        if root_id:
            opts['root_id'] = root_id
        if parent_id:
            opts['parent_id'] = parent_id
        if 'reply_to' not in opts:
            opts['reply_to'] = self.app.oid
        if group_id:
            opts['group_id'] = group_id
        if chord:
            opts['chord'] = chord
        # pylint: disable=too-many-function-args
        #   Borks on this, as it's a property.
        return self.AsyncResult(tid)
    _freeze = freeze

    def replace(self, args=None, kwargs=None, options=None):
        """Replace the args, kwargs or options set for this signature.

        These are only replaced if the argument for the section is
        not :const:`None`.
        """
        s = self.clone()
        if args is not None:
            s.args = args
        if kwargs is not None:
            s.kwargs = kwargs
        if options is not None:
            s.options = options
        return s

    def set(self, immutable=None, **options):
        """Set arbitrary execution options (same as ``.options.update(…)``).

        Returns:
            Signature: This is a chaining method call
                (i.e., it will return ``self``).
        """
        if immutable is not None:
            self.set_immutable(immutable)
        self.options.update(options)
        return self

    def set_immutable(self, immutable):
        self.immutable = immutable

    def _with_list_option(self, key):
        items = self.options.setdefault(key, [])
        if not isinstance(items, MutableSequence):
            items = self.options[key] = [items]
        return items

    def append_to_list_option(self, key, value):
        items = self._with_list_option(key)
        if value not in items:
            items.append(value)
        return value

    def extend_list_option(self, key, value):
        items = self._with_list_option(key)
        items.extend(maybe_list(value))

    def link(self, callback):
        """Add callback task to be applied if this task succeeds.

        Returns:
            Signature: the argument passed, for chaining
                or use with :func:`~functools.reduce`.
        """
        return self.append_to_list_option('link', callback)

    def link_error(self, errback):
        """Add callback task to be applied on error in task execution.

        Returns:
            Signature: the argument passed, for chaining
                or use with :func:`~functools.reduce`.
        """
        return self.append_to_list_option('link_error', errback)

    def on_error(self, errback):
        """Version of :meth:`link_error` that supports chaining.

        on_error chains the original signature, not the errback so::

            >>> add.s(2, 2).on_error(errback.s()).delay()

        calls the ``add`` task, not the ``errback`` task, but the
        reverse is true for :meth:`link_error`.
        """
        self.link_error(errback)
        return self

    def flatten_links(self):
        """Return a recursive list of dependencies.

        "unchain" if you will, but with links intact.
        """
        return list(itertools.chain.from_iterable(itertools.chain(
            [[self]],
            (link.flatten_links()
                for link in maybe_list(self.options.get('link')) or [])
        )))

    def __or__(self, other):
        # These could be implemented in each individual class,
        # I'm sure, but for now we have this.
        if isinstance(other, chord) and len(other.tasks) == 1:
            # chord with one header -> header[0] | body
            other = other.tasks[0] | other.body

        if isinstance(self, group):
            if isinstance(other, group):
                # group() | group() -> single group
                return group(
                    itertools.chain(self.tasks, other.tasks), app=self.app)
            # group() | task -> chord
            if len(self.tasks) == 1:
                # group(ONE.s()) | other -> ONE.s() | other
                # Issue #3323
                return self.tasks[0] | other
            return chord(self, body=other, app=self._app)
        elif isinstance(other, group):
            # unroll group with one member
            other = maybe_unroll_group(other)
            if isinstance(self, _chain):
                # chain | group() -> chain
                sig = self.clone()
                sig.tasks.append(other)
                return sig
            # task | group() -> chain
            return _chain(self, other, app=self.app)

        if not isinstance(self, _chain) and isinstance(other, _chain):
            # task | chain -> chain
            return _chain(
                seq_concat_seq((self,), other.tasks), app=self._app)
        elif isinstance(other, _chain):
            # chain | chain -> chain
            sig = self.clone()
            if isinstance(sig.tasks, tuple):
                sig.tasks = list(sig.tasks)
            sig.tasks.extend(other.tasks)
            return sig
        elif isinstance(self, chord):
            # chord(ONE, body) | other -> ONE | body | other
            # chord with one header task is unecessary.
            if len(self.tasks) == 1:
                return self.tasks[0] | self.body | other
            # chord | task ->  attach to body
            sig = self.clone()
            sig.body = sig.body | other
            return sig
        elif isinstance(other, Signature):
            if isinstance(self, _chain):
                if isinstance(self.tasks[-1], group):
                    # CHAIN [last item is group] | TASK -> chord
                    sig = self.clone()
                    sig.tasks[-1] = chord(
                        sig.tasks[-1], other, app=self._app)
                    return sig
                elif isinstance(self.tasks[-1], chord):
                    # CHAIN [last item is chord] -> chain with chord body.
                    sig = self.clone()
                    sig.tasks[-1].body = sig.tasks[-1].body | other
                    return sig
                else:
                    # chain | task -> chain
                    return _chain(
                        seq_concat_item(self.tasks, other), app=self._app)
            # task | task -> chain
            return _chain(self, other, app=self._app)
        return NotImplemented

    def election(self):
        type = self.type
        app = type.app
        tid = self.options.get('task_id') or uuid()

        with app.producer_or_acquire(None) as P:
            props = type.backend.on_task_call(P, tid)
            app.control.election(tid, 'task', self.clone(task_id=tid, **props),
                                 connection=P.connection)
            return type.AsyncResult(tid)

    def reprcall(self, *args, **kwargs):
        args, kwargs, _ = self._merge(args, kwargs, {}, force=True)
        return reprcall(self['task'], args, kwargs)

    def __deepcopy__(self, memo):
        memo[id(self)] = self
        return dict(self)

    def __invert__(self):
        return self.apply_async().get()

    def __reduce__(self):
        # for serialization, the task type is lazily loaded,
        # and not stored in the dict itself.
        return signature, (dict(self),)

    def __json__(self):
        return dict(self)

    def __repr__(self):
        return self.reprcall()

    if JSON_NEEDS_UNICODE_KEYS:  # pragma: no cover
        def items(self):
            for k, v in dict.items(self):
                yield k.decode() if isinstance(k, bytes) else k, v

    @property
    def name(self):
        # for duck typing compatibility with Task.name
        return self.task

    @cached_property
    def type(self):
        return self._type or self.app.tasks[self['task']]

    @cached_property
    def app(self):
        return self._app or current_app

    @cached_property
    def AsyncResult(self):
        try:
            return self.type.AsyncResult
        except KeyError:  # task not registered
            return self.app.AsyncResult

    @cached_property
    def _apply_async(self):
        try:
            return self.type.apply_async
        except KeyError:
            return _partial(self.app.send_task, self['task'])
    id = getitem_property('options.task_id', 'Task UUID')
    parent_id = getitem_property('options.parent_id', 'Task parent UUID.')
    root_id = getitem_property('options.root_id', 'Task root UUID.')
    task = getitem_property('task', 'Name of task.')
    args = getitem_property('args', 'Positional arguments to task.')
    kwargs = getitem_property('kwargs', 'Keyword arguments to task.')
    options = getitem_property('options', 'Task execution options.')
    subtask_type = getitem_property('subtask_type', 'Type of signature')
    chord_size = getitem_property(
        'chord_size', 'Size of chord (if applicable)')
    immutable = getitem_property(
        'immutable', 'Flag set if no longer accepts new arguments')


@Signature.register_type(name='chain')
@python_2_unicode_compatible
class _chain(Signature):
    tasks = getitem_property('kwargs.tasks', 'Tasks in chain.')

    @classmethod
    def from_dict(cls, d, app=None):
        tasks = d['kwargs']['tasks']
        if tasks:
            if isinstance(tasks, tuple):  # aaaargh
                tasks = d['kwargs']['tasks'] = list(tasks)
            tasks = [maybe_signature(task, app=app) for task in tasks]
        return _upgrade(d, _chain(tasks, app=app, **d['options']))

    def __init__(self, *tasks, **options):
        tasks = (regen(tasks[0]) if len(tasks) == 1 and is_list(tasks[0])
                 else tasks)
        Signature.__init__(
            self, 'celery.chain', (), {'tasks': tasks}, **options
        )
        self._use_link = options.pop('use_link', None)
        self.subtask_type = 'chain'
        self._frozen = None

    def __call__(self, *args, **kwargs):
        if self.tasks:
            return self.apply_async(args, kwargs)

    def clone(self, *args, **kwargs):
        to_signature = maybe_signature
        s = Signature.clone(self, *args, **kwargs)
        s.kwargs['tasks'] = [
            to_signature(sig, app=self._app, clone=True)
            for sig in s.kwargs['tasks']
        ]
        return s

    def apply_async(self, args=(), kwargs={}, **options):
        # python is best at unpacking kwargs, so .run is here to do that.
        app = self.app
        if app.conf.task_always_eager:
            return self.apply(args, kwargs, **options)
        return self.run(args, kwargs, app=app, **(
            dict(self.options, **options) if options else self.options))

    def run(self, args=(), kwargs={}, group_id=None, chord=None,
            task_id=None, link=None, link_error=None, publisher=None,
            producer=None, root_id=None, parent_id=None, app=None, **options):
        # pylint: disable=redefined-outer-name
        #   XXX chord is also a class in outer scope.
        app = app or self.app
        use_link = self._use_link
        if use_link is None and app.conf.task_protocol == 1:
            use_link = True
        args = (tuple(args) + tuple(self.args)
                if args and not self.immutable else self.args)

        if self._frozen:
            tasks, results = self._frozen
        else:
            tasks, results = self.prepare_steps(
                args, self.tasks, root_id, parent_id, link_error, app,
                task_id, group_id, chord,
            )

        if results:
            if link:
                tasks[0].extend_list_option('link', link)
            first_task = tasks.pop()
            # chain option may already be set, resulting in
            # "multiple values for keyword argument 'chain'" error.
            # Issue #3379.
            options['chain'] = tasks if not use_link else None
            first_task.apply_async(**options)
            return results[0]

    def freeze(self, _id=None, group_id=None, chord=None,
               root_id=None, parent_id=None):
        # pylint: disable=redefined-outer-name
        #   XXX chord is also a class in outer scope.
        _, results = self._frozen = self.prepare_steps(
            self.args, self.tasks, root_id, parent_id, None,
            self.app, _id, group_id, chord, clone=False,
        )
        return results[0]

    def prepare_steps(self, args, tasks,
                      root_id=None, parent_id=None, link_error=None, app=None,
                      last_task_id=None, group_id=None, chord_body=None,
                      clone=True, from_dict=Signature.from_dict):
        app = app or self.app
        # use chain message field for protocol 2 and later.
        # this avoids pickle blowing the stack on the recursion
        # required by linking task together in a tree structure.
        # (why is pickle using recursion? or better yet why cannot python
        #  do tail call optimization making recursion actually useful?)
        use_link = self._use_link
        if use_link is None and app.conf.task_protocol == 1:
            use_link = True
        steps = deque(tasks)

        steps_pop = steps.pop
        steps_extend = steps.extend

        prev_task = None
        prev_res = None
        tasks, results = [], []
        i = 0
        # NOTE: We are doing this in reverse order.
        # The result is a list of tasks in reverse order, that is
        # passed as the ``chain`` message field.
        # As it's reversed the worker can just do ``chain.pop()`` to
        # get the next task in the chain.
        while steps:
            task = steps_pop()
            is_first_task, is_last_task = not steps, not i

            if not isinstance(task, abstract.CallableSignature):
                task = from_dict(task, app=app)
            if isinstance(task, group):
                task = maybe_unroll_group(task)

            # first task gets partial args from chain
            if clone:
                task = task.clone(args) if is_first_task else task.clone()
            elif is_first_task:
                task.args = tuple(args) + tuple(task.args)

            if isinstance(task, _chain):
                # splice the chain
                steps_extend(task.tasks)
                continue

            if isinstance(task, group) and prev_task:
                # automatically upgrade group(...) | s to chord(group, s)
                # for chords we freeze by pretending it's a normal
                # signature instead of a group.
                tasks.pop()
                results.pop()
                task = chord(
                    task, body=prev_task,
                    task_id=prev_res.task_id, root_id=root_id, app=app,
                )

            if is_last_task:
                # chain(task_id=id) means task id is set for the last task
                # in the chain.  If the chord is part of a chord/group
                # then that chord/group must synchronize based on the
                # last task in the chain, so we only set the group_id and
                # chord callback for the last task.
                res = task.freeze(
                    last_task_id,
                    root_id=root_id, group_id=group_id, chord=chord_body,
                )
            else:
                res = task.freeze(root_id=root_id)

            i += 1

            if prev_task:
                if use_link:
                    # link previous task to this task.
                    task.link(prev_task)

                if prev_res and not prev_res.parent:
                    prev_res.parent = res

            if link_error:
                for errback in maybe_list(link_error):
                    task.link_error(errback)

            tasks.append(task)
            results.append(res)

            prev_task, prev_res = task, res
            if isinstance(task, chord):
                app.backend.ensure_chords_allowed()
                # If the task is a chord, and the body is a chain
                # the chain has already been prepared, and res is
                # set to the last task in the callback chain.

                # We need to change that so that it points to the
                # group result object.
                node = res
                while node.parent:
                    node = node.parent
                prev_res = node
        return tasks, results

    def apply(self, args=(), kwargs={}, **options):
        last, fargs = None, args
        for task in self.tasks:
            res = task.clone(fargs).apply(
                last and (last.get(),), **dict(self.options, **options))
            res.parent, last, fargs = last, res, None
        return last

    @property
    def app(self):
        app = self._app
        if app is None:
            try:
                app = self.tasks[0]._app
            except LookupError:
                pass
        return app or current_app

    def __repr__(self):
        if not self.tasks:
            return '<{0}@{1:#x}: empty>'.format(
                type(self).__name__, id(self))
        return remove_repeating_from_task(
            self.tasks[0]['task'],
            ' | '.join(repr(t) for t in self.tasks))


class chain(_chain):
    """Chain tasks together.

    Each tasks follows one another,
    by being applied as a callback of the previous task.

    Note:
        If called with only one argument, then that argument must
        be an iterable of tasks to chain: this allows us
        to use generator expressions.

    Example:
        This is effectively :math:`((2 + 2) + 4)`:

        .. code-block:: pycon

            >>> res = chain(add.s(2, 2), add.s(4))()
            >>> res.get()
            8

        Calling a chain will return the result of the last task in the chain.
        You can get to the other tasks by following the ``result.parent``'s:

        .. code-block:: pycon

            >>> res.parent.get()
            4

        Using a generator expression:

        .. code-block:: pycon

            >>> lazy_chain = chain(add.s(i) for i in range(10))
            >>> res = lazy_chain(3)

    Arguments:
        *tasks (Signature): List of task signatures to chain.
            If only one argument is passed and that argument is
            an iterable, then that'll be used as the list of signatures
            to chain instead.  This means that you can use a generator
            expression.

    Returns:
        ~celery.chain: A lazy signature that can be called to apply the first
            task in the chain.  When that task succeeed the next task in the
            chain is applied, and so on.
    """

    # could be function, but must be able to reference as :class:`chain`.
    def __new__(cls, *tasks, **kwargs):
        # This forces `chain(X, Y, Z)` to work the same way as `X | Y | Z`
        if not kwargs and tasks:
            if len(tasks) == 1 and is_list(tasks[0]):
                # ensure chain(generator_expression) works.
                tasks = tasks[0]
            return reduce(operator.or_, tasks)
        return super(chain, cls).__new__(cls, *tasks, **kwargs)


class _basemap(Signature):
    _task_name = None
    _unpack_args = itemgetter('task', 'it')

    @classmethod
    def from_dict(cls, d, app=None):
        return _upgrade(
            d, cls(*cls._unpack_args(d['kwargs']), app=app, **d['options']),
        )

    def __init__(self, task, it, **options):
        Signature.__init__(
            self, self._task_name, (),
            {'task': task, 'it': regen(it)}, immutable=True, **options
        )

    def apply_async(self, args=(), kwargs={}, **opts):
        # need to evaluate generators
        task, it = self._unpack_args(self.kwargs)
        return self.type.apply_async(
            (), {'task': task, 'it': list(it)},
            route_name=task_name_from(self.kwargs.get('task')), **opts
        )


@Signature.register_type()
@python_2_unicode_compatible
class xmap(_basemap):
    """Map operation for tasks.

    Note:
        Tasks executed sequentially in process, this is not a
        parallel operation like :class:`group`.
    """

    _task_name = 'celery.map'

    def __repr__(self):
        task, it = self._unpack_args(self.kwargs)
        return '[{0}(x) for x in {1}]'.format(
            task.task, truncate(repr(it), 100))


@Signature.register_type()
@python_2_unicode_compatible
class xstarmap(_basemap):
    """Map operation for tasks, using star arguments."""

    _task_name = 'celery.starmap'

    def __repr__(self):
        task, it = self._unpack_args(self.kwargs)
        return '[{0}(*x) for x in {1}]'.format(
            task.task, truncate(repr(it), 100))


@Signature.register_type()
class chunks(Signature):
    """Partition of tasks in n chunks."""

    _unpack_args = itemgetter('task', 'it', 'n')

    @classmethod
    def from_dict(cls, d, app=None):
        return _upgrade(
            d, chunks(*cls._unpack_args(
                d['kwargs']), app=app, **d['options']),
        )

    def __init__(self, task, it, n, **options):
        Signature.__init__(
            self, 'celery.chunks', (),
            {'task': task, 'it': regen(it), 'n': n},
            immutable=True, **options
        )

    def __call__(self, **options):
        return self.apply_async(**options)

    def apply_async(self, args=(), kwargs={}, **opts):
        return self.group().apply_async(
            args, kwargs,
            route_name=task_name_from(self.kwargs.get('task')), **opts
        )

    def group(self):
        # need to evaluate generators
        task, it, n = self._unpack_args(self.kwargs)
        return group((xstarmap(task, part, app=self._app)
                      for part in _chunks(iter(it), n)),
                     app=self._app)

    @classmethod
    def apply_chunks(cls, task, it, n, app=None):
        return cls(task, it, n, app=app)()


def _maybe_group(tasks, app):
    if isinstance(tasks, dict):
        tasks = signature(tasks, app=app)

    if isinstance(tasks, (group, _chain)):
        tasks = tasks.tasks
    elif isinstance(tasks, abstract.CallableSignature):
        tasks = [tasks]
    else:
        tasks = [signature(t, app=app) for t in tasks]
    return tasks


@Signature.register_type()
@python_2_unicode_compatible
class group(Signature):
    """Creates a group of tasks to be executed in parallel.

    A group is lazy so you must call it to take action and evaluate
    the group.

    Note:
        If only one argument is passed, and that argument is an iterable
        then that'll be used as the list of tasks instead: this
        allows us to use ``group`` with generator expressions.

    Example:
        >>> lazy_group = group([add.s(2, 2), add.s(4, 4)])
        >>> promise = lazy_group()  # <-- evaluate: returns lazy result.
        >>> promise.get()  # <-- will wait for the task to return
        [4, 8]

    Arguments:
        *tasks (List[Signature]): A list of signatures that this group will
            call. If there's only one argument, and that argument is an
            iterable, then that'll define the list of signatures instead.
        **options (Any): Execution options applied to all tasks
            in the group.

    Returns:
        ~celery.group: signature that when called will then call all of the
            tasks in the group (and return a :class:`GroupResult` instance
            that can be used to inspect the state of the group).
    """

    tasks = getitem_property('kwargs.tasks', 'Tasks in group.')

    @classmethod
    def from_dict(cls, d, app=None):
        return _upgrade(
            d, group(d['kwargs']['tasks'], app=app, **d['options']),
        )

    def __init__(self, *tasks, **options):
        if len(tasks) == 1:
            tasks = tasks[0]
            if isinstance(tasks, group):
                tasks = tasks.tasks
            if not isinstance(tasks, _regen):
                tasks = regen(tasks)
        Signature.__init__(
            self, 'celery.group', (), {'tasks': tasks}, **options
        )
        self.subtask_type = 'group'

    def __call__(self, *partial_args, **options):
        return self.apply_async(partial_args, **options)

    def skew(self, start=1.0, stop=None, step=1.0):
        it = fxrange(start, stop, step, repeatlast=True)
        for task in self.tasks:
            task.set(countdown=next(it))
        return self

    def apply_async(self, args=(), kwargs=None, add_to_parent=True,
                    producer=None, link=None, link_error=None, **options):
        if link is not None:
            raise TypeError('Cannot add link to group: use a chord')
        if link_error is not None:
            raise TypeError(
                'Cannot add link to group: do that on individual tasks')
        app = self.app
        if app.conf.task_always_eager:
            return self.apply(args, kwargs, **options)
        if not self.tasks:
            return self.freeze()

        options, group_id, root_id = self._freeze_gid(options)
        tasks = self._prepared(self.tasks, [], group_id, root_id, app)
        p = barrier()
        results = list(self._apply_tasks(tasks, producer, app, p,
                                         args=args, kwargs=kwargs, **options))
        result = self.app.GroupResult(group_id, results, ready_barrier=p)
        p.finalize()

        # - Special case of group(A.s() | group(B.s(), C.s()))
        # That is, group with single item that's a chain but the
        # last task in that chain is a group.
        #
        # We cannot actually support arbitrary GroupResults in chains,
        # but this special case we can.
        if len(result) == 1 and isinstance(result[0], GroupResult):
            result = result[0]

        parent_task = app.current_worker_task
        if add_to_parent and parent_task:
            parent_task.add_trail(result)
        return result

    def apply(self, args=(), kwargs={}, **options):
        app = self.app
        if not self.tasks:
            return self.freeze()  # empty group returns GroupResult
        options, group_id, root_id = self._freeze_gid(options)
        tasks = self._prepared(self.tasks, [], group_id, root_id, app)
        return app.GroupResult(group_id, [
            sig.apply(args=args, kwargs=kwargs, **options) for sig, _ in tasks
        ])

    def set_immutable(self, immutable):
        for task in self.tasks:
            task.set_immutable(immutable)

    def link(self, sig):
        # Simply link to first task
        sig = sig.clone().set(immutable=True)
        return self.tasks[0].link(sig)

    def link_error(self, sig):
        sig = sig.clone().set(immutable=True)
        return self.tasks[0].link_error(sig)

    def _prepared(self, tasks, partial_args, group_id, root_id, app,
                  CallableSignature=abstract.CallableSignature,
                  from_dict=Signature.from_dict,
                  isinstance=isinstance, tuple=tuple):
        for task in tasks:
            if isinstance(task, CallableSignature):
                # local sigs are always of type Signature, and we
                # clone them to make sure we don't modify the originals.
                task = task.clone()
            else:
                # serialized sigs must be converted to Signature.
                task = from_dict(task, app=app)
            if isinstance(task, group):
                # needs yield_from :(
                unroll = task._prepared(
                    task.tasks, partial_args, group_id, root_id, app,
                )
                for taskN, resN in unroll:
                    yield taskN, resN
            else:
                if partial_args and not task.immutable:
                    task.args = tuple(partial_args) + tuple(task.args)
                yield task, task.freeze(group_id=group_id, root_id=root_id)

    def _apply_tasks(self, tasks, producer=None, app=None, p=None,
                     add_to_parent=None, chord=None,
                     args=None, kwargs=None, **options):
        # pylint: disable=redefined-outer-name
        #   XXX chord is also a class in outer scope.
        app = app or self.app
        with app.producer_or_acquire(producer) as producer:
            for sig, res in tasks:
                sig.apply_async(producer=producer, add_to_parent=False,
                                chord=sig.options.get('chord') or chord,
                                args=args, kwargs=kwargs,
                                **options)

                # adding callback to result, such that it will gradually
                # fulfill the barrier.
                #
                # Using barrier.add would use result.then, but we need
                # to add the weak argument here to only create a weak
                # reference to the object.
                if p and not p.cancelled and not p.ready:
                    p.size += 1
                    res.then(p, weak=True)
                yield res  # <-- r.parent, etc set in the frozen result.

    def _freeze_gid(self, options):
        # remove task_id and use that as the group_id,
        # if we don't remove it then every task will have the same id...
        options = dict(self.options, **options)
        options['group_id'] = group_id = (
            options.pop('task_id', uuid()))
        return options, group_id, options.get('root_id')

    def freeze(self, _id=None, group_id=None, chord=None,
               root_id=None, parent_id=None):
        # pylint: disable=redefined-outer-name
        #   XXX chord is also a class in outer scope.
        opts = self.options
        try:
            gid = opts['task_id']
        except KeyError:
            gid = opts['task_id'] = uuid()
        if group_id:
            opts['group_id'] = group_id
        if chord:
            opts['chord'] = chord
        root_id = opts.setdefault('root_id', root_id)
        parent_id = opts.setdefault('parent_id', parent_id)
        new_tasks = []
        # Need to unroll subgroups early so that chord gets the
        # right result instance for chord_unlock etc.
        results = list(self._freeze_unroll(
            new_tasks, group_id, chord, root_id, parent_id,
        ))
        if isinstance(self.tasks, MutableSequence):
            self.tasks[:] = new_tasks
        else:
            self.tasks = new_tasks
        return self.app.GroupResult(gid, results)
    _freeze = freeze

    def _freeze_unroll(self, new_tasks, group_id, chord, root_id, parent_id):
        # pylint: disable=redefined-outer-name
        #   XXX chord is also a class in outer scope.
        stack = deque(self.tasks)
        while stack:
            task = maybe_signature(stack.popleft(), app=self._app).clone()
            if isinstance(task, group):
                stack.extendleft(task.tasks)
            else:
                new_tasks.append(task)
                yield task.freeze(group_id=group_id,
                                  chord=chord, root_id=root_id,
                                  parent_id=parent_id)

    def __repr__(self):
        if self.tasks:
            return remove_repeating_from_task(
                self.tasks[0]['task'],
                'group({0.tasks!r})'.format(self))
        return 'group(<empty>)'

    def __len__(self):
        return len(self.tasks)

    @property
    def app(self):
        app = self._app
        if app is None:
            try:
                app = self.tasks[0].app
            except LookupError:
                pass
        return app if app is not None else current_app


@Signature.register_type()
@python_2_unicode_compatible
class chord(Signature):
    r"""Barrier synchronization primitive.

    A chord consists of a header and a body.

    The header is a group of tasks that must complete before the callback is
    called.  A chord is essentially a callback for a group of tasks.

    The body is applied with the return values of all the header
    tasks as a list.

    Example:

        The chord:

        .. code-block:: pycon

            >>> res = chord([add.s(2, 2), add.s(4, 4)])(sum_task.s())

        is effectively :math:`\Sigma ((2 + 2) + (4 + 4))`:

        .. code-block:: pycon

            >>> res.get()
            12
    """

    @classmethod
    def from_dict(cls, d, app=None):
        args, d['kwargs'] = cls._unpack_args(**d['kwargs'])
        return _upgrade(d, cls(*args, app=app, **d))

    @staticmethod
    def _unpack_args(header=None, body=None, **kwargs):
        # Python signatures are better at extracting keys from dicts
        # than manually popping things off.
        return (header, body), kwargs

    def __init__(self, header, body=None, task='celery.chord',
                 args=(), kwargs={}, app=None, **options):
        Signature.__init__(
            self, task, args,
            dict(kwargs=kwargs, header=_maybe_group(header, app),
                 body=maybe_signature(body, app=app)), app=app, **options
        )
        self.subtask_type = 'chord'

    def __call__(self, body=None, **options):
        return self.apply_async((), {'body': body} if body else {}, **options)

    def freeze(self, _id=None, group_id=None, chord=None,
               root_id=None, parent_id=None):
        # pylint: disable=redefined-outer-name
        #   XXX chord is also a class in outer scope.
        if not isinstance(self.tasks, group):
            self.tasks = group(self.tasks, app=self.app)
        header_result = self.tasks.freeze(
            parent_id=parent_id, root_id=root_id, chord=self.body)
        bodyres = self.body.freeze(_id, root_id=root_id)
        # we need to link the body result back to the group result,
        # but the body may actually be a chain,
        # so find the first result without a parent
        node = bodyres
        seen = set()
        while node:
            if node.id in seen:
                raise RuntimeError('Recursive result parents')
            seen.add(node.id)
            if node.parent is None:
                node.parent = header_result
                break
            node = node.parent
        self.id = self.tasks.id
        return bodyres

    def apply_async(self, args=(), kwargs={}, task_id=None,
                    producer=None, publisher=None, connection=None,
                    router=None, result_cls=None, **options):
        kwargs = kwargs or {}
        args = (tuple(args) + tuple(self.args)
                if args and not self.immutable else self.args)
        body = kwargs.pop('body', None) or self.kwargs['body']
        kwargs = dict(self.kwargs['kwargs'], **kwargs)
        body = body.clone(**options)
        app = self._get_app(body)
        tasks = (self.tasks.clone() if isinstance(self.tasks, group)
                 else group(self.tasks, app=app))
        if app.conf.task_always_eager:
            return self.apply(args, kwargs,
                              body=body, task_id=task_id, **options)
        if len(self.tasks) == 1:
            # chord([A], B) can be optimized as A | B
            # - Issue #3323
            return (self.tasks[0] | body).set(task_id=task_id).apply_async(
                args, kwargs, **options)
        # chord([A, B, ...], C)
        return self.run(tasks, body, args, task_id=task_id, **options)

    def apply(self, args=(), kwargs={}, propagate=True, body=None, **options):
        body = self.body if body is None else body
        tasks = (self.tasks.clone() if isinstance(self.tasks, group)
                 else group(self.tasks, app=self.app))
        return body.apply(
            args=(tasks.apply(args, kwargs).get(propagate=propagate),),
        )

    def _traverse_tasks(self, tasks, value=None):
        stack = deque(tasks)
        while stack:
            task = stack.popleft()
            if isinstance(task, group):
                stack.extend(task.tasks)
            else:
                yield task if value is None else value

    def __length_hint__(self):
        tasks = (self.tasks.tasks if isinstance(self.tasks, group)
                 else self.tasks)
        return sum(self._traverse_tasks(tasks, 1))

    def run(self, header, body, partial_args, app=None, interval=None,
            countdown=1, max_retries=None, eager=False,
            task_id=None, **options):
        app = app or self._get_app(body)
        group_id = header.options.get('task_id') or uuid()
        root_id = body.options.get('root_id')
        body.chord_size = self.__length_hint__()
        options = dict(self.options, **options) if options else self.options
        if options:
            options.pop('task_id', None)
            body.options.update(options)

        results = header.freeze(
            group_id=group_id, chord=body, root_id=root_id).results
        bodyres = body.freeze(task_id, root_id=root_id)

        # Chains should not be passed to the header tasks. See #3771
        options.pop('chain', None)

        parent = app.backend.apply_chord(
            header, partial_args, group_id, body,
            interval=interval, countdown=countdown,
            options=options, max_retries=max_retries,
            result=results)
        bodyres.parent = parent
        return bodyres

    def clone(self, *args, **kwargs):
        s = Signature.clone(self, *args, **kwargs)
        # need to make copy of body
        try:
            s.kwargs['body'] = maybe_signature(s.kwargs['body'], clone=True)
        except (AttributeError, KeyError):
            pass
        return s

    def link(self, callback):
        self.body.link(callback)
        return callback

    def link_error(self, errback):
        self.body.link_error(errback)
        return errback

    def set_immutable(self, immutable):
        # changes mutability of header only, not callback.
        for task in self.tasks:
            task.set_immutable(immutable)

    def __repr__(self):
        if self.body:
            if isinstance(self.body, _chain):
                return remove_repeating_from_task(
                    self.body.tasks[0]['task'],
                    '%({0} | {1!r})'.format(
                        self.body.tasks[0].reprcall(self.tasks),
                        chain(self.body.tasks[1:], app=self._app),
                    ),
                )
            return '%' + remove_repeating_from_task(
                self.body['task'], self.body.reprcall(self.tasks))
        return '<chord without body: {0.tasks!r}>'.format(self)

    @cached_property
    def app(self):
        return self._get_app(self.body)

    def _get_app(self, body=None):
        app = self._app
        if app is None:
            try:
                tasks = self.tasks.tasks  # is a group
            except AttributeError:
                tasks = self.tasks
            if len(tasks):
                app = tasks[0]._app
            if app is None and body is not None:
                app = body._app
        return app if app is not None else current_app

    tasks = getitem_property('kwargs.header', 'Tasks in chord header.')
    body = getitem_property('kwargs.body', 'Body task of chord.')


def signature(varies, *args, **kwargs):
    """Create new signature.

    - if the first argument is a signature already then it's cloned.
    - if the first argument is a dict, then a Signature version is returned.

    Returns:
        Signature: The resulting signature.
    """
    app = kwargs.get('app')
    if isinstance(varies, dict):
        if isinstance(varies, abstract.CallableSignature):
            return varies.clone()
        return Signature.from_dict(varies, app=app)
    return Signature(varies, *args, **kwargs)
subtask = signature  # noqa: E305 XXX compat


def maybe_signature(d, app=None, clone=False):
    """Ensure obj is a signature, or None.

    Arguments:
        d (Optional[Union[abstract.CallableSignature, Mapping]]):
            Signature or dict-serialized signature.
        app (celery.Celery):
            App to bind signature to.
        clone (bool):
            If d' is already a signature, the signature
           will be cloned when this flag is enabled.

    Returns:
        Optional[abstract.CallableSignature]
    """
    if d is not None:
        if isinstance(d, abstract.CallableSignature):
            if clone:
                d = d.clone()
        elif isinstance(d, dict):
            d = signature(d)

        if app is not None:
            d._app = app
    return d
maybe_subtask = maybe_signature  # noqa: E305 XXX compat