Manage TiCDC Cluster and Replication Tasks

This document describes how to upgrade TiCDC cluster and modify the configuration of TiCDC cluster using TiUP, and how to manage the TiCDC cluster and replication tasks using the command-line tool cdc cli.

You can also use the HTTP interface (the TiCDC OpenAPI feature) to manage the TiCDC cluster and replication tasks. For details, see TiCDC OpenAPI.

Upgrade TiCDC using TiUP

This section introduces how to upgrade the TiCDC cluster using TiUP. In the following example, assume that you need to upgrade TiCDC and the entire TiDB cluster to v6.2.0.

tiup update --self && \
tiup update --all && \
tiup cluster upgrade <cluster-name> v6.2.0

Notes for upgrade

Modify TiCDC configuration using TiUP

This section introduces how to modify the configuration of TiCDC cluster using the tiup cluster edit-config command of TiUP. The following example changes the value of gc-ttl from the default 86400 to 3600, namely, one hour.

First, execute the following command. You need to replace <cluster-name> with your actual cluster name.

tiup cluster edit-config <cluster-name>

Then, enter the vi editor page and modify the cdc configuraion under server-configs. The configuration is shown below:

 server_configs:
  tidb: {}
  tikv: {}
  pd: {}
  tiflash: {}
  tiflash-learner: {}
  pump: {}
  drainer: {}
  cdc:
    gc-ttl: 3600

After the modification, execute the tiup cluster reload -R cdc command to reload the configuration.

Use TLS

For details about using encrypted data transmission (TLS), see Enable TLS Between TiDB Components.

Use cdc cli to manage cluster status and data replication task

This section introduces how to use cdc cli to manage a TiCDC cluster and data replication tasks. cdc cli is the cli sub-command executed using the cdc binary. The following description assumes that:

  • cli commands are executed directly using the cdc binary;
  • PD listens on 10.0.10.25 and the port is 2379.
Note

The IP address and port that PD listens on correspond to the advertise-client-urls parameter specified during the pd-server startup. Multiple pd-servers have multiple advertise-client-urls parameters and you can specify one or multiple parameters. For example, --pd=http://10.0.10.25:2379 or --pd=http://10.0.10.25:2379,http://10.0.10.26:2379,http://10.0.10.27:2379.

If you deploy TiCDC using TiUP, replace cdc cli in the following commands with tiup ctl cdc.

Manage TiCDC service progress (capture)

  • Query the capture list:

    cdc cli capture list --pd=http://10.0.10.25:2379
    
    [
      {
        "id": "806e3a1b-0e31-477f-9dd6-f3f2c570abdd",
        "is-owner": true,
        "address": "127.0.0.1:8300"
      },
      {
        "id": "ea2a4203-56fe-43a6-b442-7b295f458ebc",
        "is-owner": false,
        "address": "127.0.0.1:8301"
      }
    ]
    
    • id: The ID of the service process.
    • is-owner: Indicates whether the service process is the owner node.
    • address: The address via which the service process provides interface to the outside.

Manage replication tasks (changefeed)

State transfer of replication tasks

The state of a replication task represents the running status of the replication task. During the running of TiCDC, replication tasks might fail with errors, be manually paused, resumed, or reach the specified TargetTs. These behaviors can lead to the change of the replication task state. This section describes the states of TiCDC replication tasks and the transfer relationships between states.

TiCDC state transfer

The states in the above state transfer diagram are described as follows:

  • Normal: The replication task runs normally and the checkpoint-ts proceeds normally.
  • Stopped: The replication task is stopped, because the user manually pauses the changefeed. The changefeed in this state blocks GC operations.
  • Error: The replication task returns an error. The replication cannot continue due to some recoverable errors. The changefeed in this state keeps trying to resume until the state transfers to Normal. The changefeed in this state blocks GC operations.
  • Finished: The replication task is finished and has reached the preset TargetTs. The changefeed in this state does not block GC operations.
  • Failed: The replication task fails. Due to some unrecoverable errors, the replication task cannot resume and cannot be recovered. The changefeed in this state does not block GC operations.

The numbers in the above state transfer diagram are described as follows.

  • ① Execute the changefeed pause command
  • ② Execute the changefeed resume command to resume the replication task
  • ③ Recoverable errors occur during the changefeed operation, and the operation is resumed automatically.
  • ④ Execute the changefeed resume command to resume the replication task
  • ⑤ Recoverable errors occur during the changefeed operation
  • changefeed has reached the preset TargetTs, and the replication is automatically stopped.
  • changefeed suspended longer than the duration specified by gc-ttl, and cannot be resumed.
  • changefeed experienced an unrecoverable error when trying to execute automatic recovery.

Create a replication task

Execute the following commands to create a replication task:

cdc cli changefeed create --pd=http://10.0.10.25:2379 --sink-uri="mysql://root:123456@127.0.0.1:3306/" --changefeed-id="simple-replication-task" --sort-engine="unified"
Create changefeed successfully!
ID: simple-replication-task
Info: {"sink-uri":"mysql://root:123456@127.0.0.1:3306/","opts":{},"create-time":"2020-03-12T22:04:08.103600025+08:00","start-ts":415241823337054209,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":".","config":{"case-sensitive":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null}
  • --changefeed-id: The ID of the replication task. The format must match the ^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$ regular expression. If this ID is not specified, TiCDC automatically generates a UUID (the version 4 format) as the ID.

  • --sink-uri: The downstream address of the replication task. Configure --sink-uri according to the following format. Currently, the scheme supports mysql/tidb/kafka/pulsar/s3/local.

    [scheme]://[userinfo@][host]:[port][/path]?[query_parameters]
    

    When a URI contains special characters, you need to process these special characters using URL encoding.

  • --start-ts: Specifies the starting TSO of the changefeed. From this TSO, the TiCDC cluster starts pulling data. The default value is the current time.

  • --target-ts: Specifies the ending TSO of the changefeed. To this TSO, the TiCDC cluster stops pulling data. The default value is empty, which means that TiCDC does not automatically stop pulling data.

  • --sort-engine: Specifies the sorting engine for the changefeed. Because TiDB and TiKV adopt distributed architectures, TiCDC must sort the data changes before writing them to the sink. This option supports unified (by default)/memory/file.

    • unified: When unified is used, TiCDC prefers data sorting in memory. If the memory is insufficient, TiCDC automatically uses the disk to store the temporary data. This is the default value of --sort-engine.
    • memory: Sorts data changes in memory. It is NOT recommended to use this sorting engine, because OOM is easily triggered when you replicate a large amount of data.
    • file: Entirely uses the disk to store the temporary data. This feature is deprecated. It is NOT recommended to use it in any situation.
  • --config: Specifies the configuration file of the changefeed.

  • sort-dir: Specifies the temporary file directory used by the sorting engine. Note that this option is not supported since TiDB v4.0.13, v5.0.3 and v5.1.0. Do not use it any more.

Configure sink URI with mysql/tidb

Sample configuration:

--sink-uri="mysql://root:123456@127.0.0.1:3306/?worker-count=16&max-txn-row=5000"

The following are descriptions of parameters and parameter values that can be configured for the sink URI with mysql/tidb:

Parameter/Parameter ValueDescription
rootThe username of the downstream database
123456The password of the downstream database
127.0.0.1The IP address of the downstream database
3306The port for the downstream data
worker-countThe number of SQL statements that can be concurrently executed to the downstream (optional, 16 by default)
max-txn-rowThe size of a transaction batch that can be executed to the downstream (optional, 256 by default)
ssl-caThe path of the CA certificate file needed to connect to the downstream MySQL instance (optional)
ssl-certThe path of the certificate file needed to connect to the downstream MySQL instance (optional)
ssl-keyThe path of the certificate key file needed to connect to the downstream MySQL instance (optional)
time-zoneThe time zone used when connecting to the downstream MySQL instance, which is effective since v4.0.8. This is an optional parameter. If this parameter is not specified, the time zone of TiCDC service processes is used. If this parameter is set to an empty value, no time zone is specified when TiCDC connects to the downstream MySQL instance and the default time zone of the downstream is used.

Configure sink URI with kafka

Sample configuration:

--sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1"

The following are descriptions of parameters and parameter values that can be configured for the sink URI with kafka:

Parameter/Parameter ValueDescription
127.0.0.1The IP address of the downstream Kafka services
9092The port for the downstream Kafka
topic-nameVariable. The name of the Kafka topic
kafka-versionThe version of the downstream Kafka (optional, 2.4.0 by default. Currently, the earliest supported Kafka version is 0.11.0.2 and the latest one is 2.7.0. This value needs to be consistent with the actual version of the downstream Kafka)
kafka-client-idSpecifies the Kafka client ID of the replication task (optional. TiCDC_sarama_producer_replication ID by default)
partition-numThe number of the downstream Kafka partitions (optional. The value must be no greater than the actual number of partitions; otherwise, the replication task cannot be created successfully. 3 by default)
max-message-bytesThe maximum size of data that is sent to Kafka broker each time (optional, 10MB by default). From v5.0.6 and v4.0.6, the default value has changed from 64MB and 256MB to 10MB.
replication-factorThe number of Kafka message replicas that can be saved (optional, 1 by default)
protocolThe protocol with which messages are output to Kafka. The value options are canal-json, open-protocol, canal, avro and maxwell.
auto-create-topicDetermines whether TiCDC creates the topic automatically when the topic-name passed in does not exist in the Kafka cluster (optional, true by default)
enable-tidb-extensionOptional. false by default. When the output protocol is canal-json, if the value is true, TiCDC sends Resolved events and adds the TiDB extension field to the Kafka message. From v6.1.0, this parameter is also applicable to the avro protocol. If the value is true, TiCDC adds three TiDB extension fields to the Kafka message.
max-batch-sizeNew in v4.0.9. If the message protocol supports outputting multiple data changes to one Kafka message, this parameter specifies the maximum number of data changes in one Kafka message. It currently takes effect only when Kafka's protocol is open-protocol. (optional, 16 by default)
enable-tlsWhether to use TLS to connect to the downstream Kafka instance (optional, false by default)
caThe path of the CA certificate file needed to connect to the downstream Kafka instance (optional)
certThe path of the certificate file needed to connect to the downstream Kafka instance (optional)
keyThe path of the certificate key file needed to connect to the downstream Kafka instance (optional)
sasl-userThe identity (authcid) of SASL/PLAIN or SASL/SCRAM authentication needed to connect to the downstream Kafka instance (optional)
sasl-passwordThe password of SASL/PLAIN or SASL/SCRAM authentication needed to connect to the downstream Kafka instance (optional)
sasl-mechanismThe name of SASL authentication needed to connect to the downstream Kafka instance. The value can be plain, scram-sha-256, scram-sha-512, or gssapi.
sasl-gssapi-auth-typeThe gssapi authentication type. Values can be user or keytab (optional)
sasl-gssapi-keytab-pathThe gssapi keytab path (optional)
sasl-gssapi-kerberos-config-pathThe gssapi kerberos configuration path (optional)
sasl-gssapi-service-nameThe gssapi service name (optional)
sasl-gssapi-userThe user name of gssapi authentication (optional)
sasl-gssapi-passwordThe password of gssapi authentication (optional)
sasl-gssapi-realmThe gssapi realm name (optional)
sasl-gssapi-disable-pafxfastWhether to disable the gssapi PA-FX-FAST (optional)
dial-timeoutThe timeout in establishing a connection with the downstream Kafka. The default value is 10s
read-timeoutThe timeout in getting a response returned by the downstream Kafka. The default value is 10s
write-timeoutThe timeout in sending a request to the downstream Kafka. The default value is 10s
avro-decimal-handling-modeOnly effective with the avro protocol. Determines how Avro handles the DECIMAL field. The value can be string or precise, indicating either mapping the DECIMAL field to a string or a precise floating number.
avro-bigint-unsigned-handling-modeOnly effective with the avro protocol. Determines how Avro handles the BIGINT UNSIGNED field. The value can be string or long, indicating either mapping the BIGINT UNSIGNED field to a 64-bit signed number or a string.

Best practices:

  • It is recommended that you create your own Kafka Topic. At a minimum, you need to set the maximum amount of data of each message that the Topic can send to the Kafka broker, and the number of downstream Kafka partitions. When you create a changefeed, these two settings correspond to max-message-bytes and partition-num, respectively.
  • If you create a changefeed with a Topic that does not yet exist, TiCDC will try to create the Topic using the partition-num and replication-factor parameters. It is recommended that you specify these parameters explicitly.
  • In most cases, it is recommended to use the canal-json protocol.
Note

When protocol is open-protocol, TiCDC tries to avoid generating messages that exceed max-message-bytes in length. However, if a row is so large that a single change alone exceeds max-message-bytes in length, to avoid silent failure, TiCDC tries to output this message and prints a warning in the log.

TiCDC uses the authentication and authorization of Kafka

The following are examples when using Kafka SASL authentication:

  • SASL/PLAIN

    --sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-user=alice-user&sasl-password=alice-secret&sasl-mechanism=plain"
    
  • SASL/SCRAM

    SCRAM-SHA-256 and SCRAM-SHA-512 are similar to the PLAIN method. You just need to specify sasl-mechanism as the corresponding authentication method.

  • SASL/GSSAPI

    SASL/GSSAPI user authentication:

    --sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-mechanism=gssapi&sasl-gssapi-auth-type=user&sasl-gssapi-kerberos-config-path=/etc/krb5.conf&sasl-gssapi-service-name=kafka&sasl-gssapi-user=alice/for-kafka&sasl-gssapi-password=alice-secret&sasl-gssapi-realm=example.com"
    

    Values of sasl-gssapi-user and sasl-gssapi-realm are related to the principle specified in kerberos. For example, if the principle is set as alice/for-kafka@example.com, then sasl-gssapi-user and sasl-gssapi-realm are specified as alice/for-kafka and example.com respectively.

    SASL/GSSAPI keytab authentication:

    --sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-mechanism=gssapi&sasl-gssapi-auth-type=keytab&sasl-gssapi-kerberos-config-path=/etc/krb5.conf&sasl-gssapi-service-name=kafka&sasl-gssapi-user=alice/for-kafka&sasl-gssapi-keytab-path=/var/lib/secret/alice.key&sasl-gssapi-realm=example.com"
    

    For more information about SASL/GSSAPI authentication methods, see Configuring GSSAPI.

  • TLS/SSL encryption

    If the Kafka broker has TLS/SSL encryption enabled, you need to add the -enable-tls=true parameter to --sink-uri. If you want to use self-signed certificates, you also need to specify ca, cert and key in --sink-uri.

  • ACL authorization

    The minimum set of permissions required for TiCDC to function properly is as follows.

    • The Create and Write permissions for the Topic resource type.
    • The DescribeConfigs permission for the Cluster resource type.

Integrate TiCDC with Kafka Connect (Confluent Platform)

To use the data connectors provided by Confluent to stream data to relational or non-relational databases, you need to use the avro protocol and provide a URL for Confluent Schema Registry in schema-registry.

Sample configuration:

--sink-uri="kafka://127.0.0.1:9092/topic-name?&protocol=avro&replication-factor=3" --schema-registry="http://127.0.0.1:8081" --config changefeed_config.toml
[sink]
dispatchers = [
 {matcher = ['*.*'], topic = "tidb_{schema}_{table}"},
]

For detailed integration guide, see Quick Start Guide on Integrating TiDB with Confluent Platform.

Configure sink URI with pulsar

Warning

This is still an experimental feature. Do NOT use it in a production environment.

Sample configuration:

--sink-uri="pulsar://127.0.0.1:6650/topic-name?connectionTimeout=2s"

The following are descriptions of parameters that can be configured for the sink URI with pulsar:

ParameterDescription
connectionTimeoutThe timeout for establishing a connection to the downstream Pulsar, which is optional and defaults to 30 (seconds)
operationTimeoutThe timeout for performing an operation on the downstream Pulsar, which is optional and defaults to 30 (seconds)
tlsTrustCertsFilePathThe path of the CA certificate file needed to connect to the downstream Pulsar instance (optional)
tlsAllowInsecureConnectionDetermines whether to allow unencrypted connection after TLS is enabled (optional)
tlsValidateHostnameDetermines whether to verify the host name of the certificate from the downstream Pulsar (optional)
maxConnectionsPerBrokerThe maximum number of connections allowed to a single downstream Pulsar broker, which is optional and defaults to 1
auth.tlsUses the TLS mode to verify the downstream Pulsar (optional). For example, auth=tls&auth.tlsCertFile=/path/to/cert&auth.tlsKeyFile=/path/to/key.
auth.tokenUses the token mode to verify the downstream Pulsar (optional). For example, auth=token&auth.token=secret-token or auth=token&auth.file=path/to/secret-token-file.
nameThe name of Pulsar producer in TiCDC (optional)
protocolThe protocol with which messages are output to Pulsar. The value options are canal-json, open-protocol, canal, avro, and maxwell.
maxPendingMessagesSets the maximum size of the pending message queue, which is optional and defaults to 1000. For example, pending for the confirmation message from Pulsar.
disableBatchingDisables automatically sending messages in batches (optional)
batchingMaxPublishDelaySets the duration within which the messages sent are batched (default: 10ms)
compressionTypeSets the compression algorithm used for sending messages (optional). The value options are NONE, LZ4, ZLIB, and ZSTD. (NONE by default)
hashingSchemeThe hash algorithm used for choosing the partition to which a message is sent (optional). The value options are JavaStringHash (default) and Murmur3.
properties.*The customized properties added to the Pulsar producer in TiCDC (optional). For example, properties.location=Hangzhou.

For more parameters of Pulsar, see pulsar-client-go ClientOptions and pulsar-client-go ProducerOptions.

Use the task configuration file

For more replication configuration (for example, specify replicating a single table), see Task configuration file.

You can use a configuration file to create a replication task in the following way:

cdc cli changefeed create --pd=http://10.0.10.25:2379 --sink-uri="mysql://root:123456@127.0.0.1:3306/" --config changefeed.toml

In the command above, changefeed.toml is the configuration file for the replication task.

Query the replication task list

Execute the following command to query the replication task list:

cdc cli changefeed list --pd=http://10.0.10.25:2379
[{
    "id": "simple-replication-task",
    "summary": {
      "state": "normal",
      "tso": 417886179132964865,
      "checkpoint": "2020-07-07 16:07:44.881",
      "error": null
    }
}]
  • checkpoint indicates that TiCDC has already replicated data before this time point to the downstream.
  • state indicates the state of the replication task.
    • normal: The replication task runs normally.
    • stopped: The replication task is stopped (manually paused).
    • error: The replication task is stopped (by an error).
    • removed: The replication task is removed. Tasks of this state are displayed only when you have specified the --all option. To see these tasks when this option is not specified, execute the changefeed query command.
    • finished: The replication task is finished (data is replicated to the target-ts). Tasks of this state are displayed only when you have specified the --all option. To see these tasks when this option is not specified, execute the changefeed query command.

Query a specific replication task

To query a specific replication task, execute the changefeed query command. The query result includes the task information and the task state. You can specify the --simple or -s argument to simplify the query result that will only include the basic replication state and the checkpoint information. If you do not specify this argument, detailed task configuration, replication states, and replication table information are output.

cdc cli changefeed query -s --pd=http://10.0.10.25:2379 --changefeed-id=simple-replication-task
{
 "state": "normal",
 "tso": 419035700154597378,
 "checkpoint": "2020-08-27 10:12:19.579",
 "error": null
}

In the command and result above:

  • state is the replication state of the current changefeed. Each state must be consistent with the state in changefeed list.
  • tso represents the largest transaction TSO in the current changefeed that has been successfully replicated to the downstream.
  • checkpoint represents the corresponding time of the largest transaction TSO in the current changefeed that has been successfully replicated to the downstream.
  • error records whether an error has occurred in the current changefeed.
cdc cli changefeed query --pd=http://10.0.10.25:2379 --changefeed-id=simple-replication-task
{
  "info": {
    "sink-uri": "mysql://127.0.0.1:3306/?max-txn-row=20\u0026worker-number=4",
    "opts": {},
    "create-time": "2020-08-27T10:33:41.687983832+08:00",
    "start-ts": 419036036249681921,
    "target-ts": 0,
    "admin-job-type": 0,
    "sort-engine": "unified",
    "sort-dir": ".",
    "config": {
      "case-sensitive": true,
      "enable-old-value": false,
      "filter": {
        "rules": [
          "*.*"
        ],
        "ignore-txn-start-ts": null,
        "ddl-allow-list": null
      },
      "mounter": {
        "worker-num": 16
      },
      "sink": {
        "dispatchers": null,
      },
      "scheduler": {
        "type": "table-number",
        "polling-time": -1
      }
    },
    "state": "normal",
    "history": null,
    "error": null
  },
  "status": {
    "resolved-ts": 419036036249681921,
    "checkpoint-ts": 419036036249681921,
    "admin-job-type": 0
  },
  "count": 0,
  "task-status": [
    {
      "capture-id": "97173367-75dc-490c-ae2d-4e990f90da0f",
      "status": {
        "tables": {
          "47": {
            "start-ts": 419036036249681921
          }
        },
        "operation": null,
        "admin-job-type": 0
      }
    }
  ]
}

In the command and result above:

  • info is the replication configuration of the queried changefeed.
  • status is the replication state of the queried changefeed.
    • resolved-ts: The largest transaction TS in the current changefeed. Note that this TS has been successfully sent from TiKV to TiCDC.
    • checkpoint-ts: The largest transaction TS in the current changefeed. Note that this TS has been successfully written to the downstream.
    • admin-job-type: The status of a changefeed:
      • 0: The state is normal.
      • 1: The task is paused. When the task is paused, all replicated processors exit. The configuration and the replication status of the task are retained, so you can resume the task from checkpiont-ts.
      • 2: The task is resumed. The replication task resumes from checkpoint-ts.
      • 3: The task is removed. When the task is removed, all replicated processors are ended, and the configuration information of the replication task is cleared up. Only the replication status is retained for later queries.
  • task-status indicates the state of each replication sub-task in the queried changefeed.

Pause a replication task

Execute the following command to pause a replication task:

cdc cli changefeed pause --pd=http://10.0.10.25:2379 --changefeed-id simple-replication-task

In the above command:

  • --changefeed-id=uuid represents the ID of the changefeed that corresponds to the replication task you want to pause.

Resume a replication task

Execute the following command to resume a paused replication task:

cdc cli changefeed resume --pd=http://10.0.10.25:2379 --changefeed-id simple-replication-task
  • --changefeed-id=uuid represents the ID of the changefeed that corresponds to the replication task you want to resume.
  • --overwrite-checkpoint-ts: starting from v6.2.0, you can specify the starting TSO of resuming the replication task. TiCDC starts pulling data from the specified TSO. The argument accepts now or a specific TSO (such as 434873584621453313). The specified TSO must be in the range of (GC safe point, CurrentTSO]. If this argument is not specified, TiCDC replicates data from the current checkpoint-ts by default.
  • --no-confirm: when the replication is resumed, you do not need to confirm the related information. Defaults to false.
Note
  • If the TSO specified in --overwrite-checkpoint-ts (t2) is larger than the current checkpoint TSO in the changefeed (t1), data between t1 and t2 will not be replicated to the downstream. This causes data loss. You can obtain t1 by running cdc cli changefeed query.
  • If the TSO specified in --overwrite-checkpoint-ts (t2) is smaller than the current checkpoint TSO in the changefeed (t1), TiCDC pulls data from an old time point (t2), which might cause data duplication (for example, if the downstream is MQ sink).

Remove a replication task

Execute the following command to remove a replication task:

cdc cli changefeed remove --pd=http://10.0.10.25:2379 --changefeed-id simple-replication-task

In the above command:

  • --changefeed-id=uuid represents the ID of the changefeed that corresponds to the replication task you want to remove.

Update task configuration

Starting from v4.0.4, TiCDC supports modifying the configuration of the replication task (not dynamically). To modify the changefeed configuration, pause the task, modify the configuration, and then resume the task.

cdc cli changefeed pause -c test-cf --pd=http://10.0.10.25:2379
cdc cli changefeed update -c test-cf --pd=http://10.0.10.25:2379 --sink-uri="mysql://127.0.0.1:3306/?max-txn-row=20&worker-number=8" --config=changefeed.toml
cdc cli changefeed resume -c test-cf --pd=http://10.0.10.25:2379

Currently, you can modify the following configuration items:

  • sink-uri of the changefeed.
  • The changefeed configuration file and all configuration items in the file.
  • Whether to use the file sorting feature and the sorting directory.
  • The target-ts of the changefeed.

Manage processing units of replication sub-tasks (processor)

  • Query the processor list:

    cdc cli processor list --pd=http://10.0.10.25:2379
    
    [
            {
                    "id": "9f84ff74-abf9-407f-a6e2-56aa35b33888",
                    "capture-id": "b293999a-4168-4988-a4f4-35d9589b226b",
                    "changefeed-id": "simple-replication-task"
            }
    ]
    
  • Query a specific changefeed which corresponds to the status of a specific replication task:

    cdc cli processor query --pd=http://10.0.10.25:2379 --changefeed-id=simple-replication-task --capture-id=b293999a-4168-4988-a4f4-35d9589b226b
    
    {
      "status": {
        "tables": {
          "56": {    # ID of the replication table, corresponding to tidb_table_id of a table in TiDB
            "start-ts": 417474117955485702
          }
        },
        "operation": null,
        "admin-job-type": 0
      },
      "position": {
        "checkpoint-ts": 417474143881789441,
        "resolved-ts": 417474143881789441,
        "count": 0
      }
    }
    

    In the command above:

    • status.tables: Each key number represents the ID of the replication table, corresponding to tidb_table_id of a table in TiDB.
    • resolved-ts: The largest TSO among the sorted data in the current processor.
    • checkpoint-ts: The largest TSO that has been successfully written to the downstream in the current processor.

Task configuration file

This section introduces the configuration of a replication task.

# Specifies whether the database names and tables in the configuration file are case-sensitive.
# The default value is true.
# This configuration item affects configurations related to filter and sink.
case-sensitive = true

# Specifies whether to output the old value. New in v4.0.5. Since v5.0, the default value is `true`.
enable-old-value = true

[filter]
# Ignores the transaction of specified start_ts.
ignore-txn-start-ts = [1, 2]

# Filter rules.
# Filter syntax: https://docs.pingcap.com/tidb/stable/table-filter#syntax.
rules = ['*.*', '!test.*']

# Event filter rules.
# The detailed syntax is described in the event filter rules section.
# The first event filter rule.
[[filter.event-filters]]
matcher = ["test.worker"] # matcher is an allow list, which means this rule only applies to the worker table in the test database.
ignore-event = ["insert"] # Ignore insert events.
ignore-sql = ["^drop", "add column"] # Ignore DDLs that start with "drop" or contain "add column".
ignore-delete-value-expr = "name = 'john'" # Ignore delete DMLs that contain the condition "name = 'john'".
ignore-insert-value-expr = "id >= 100" # Ignore insert DMLs that contain the condition "id >= 100".
ignore-update-old-value-expr = "age < 18" # Ignore update DMLs whose old value contains "age < 18".
ignore-update-new-value-expr = "gender = 'male'" # Ignore update DMLs whose new value contains "gender = 'male'".

# The second event filter rule.
matcher = ["test.fruit"] # matcher is an allow list, which means this rule only applies to the fruit table in the test database.
ignore-event = ["drop table"] # Ignore drop table events.
ignore-sql = ["delete"] # Ignore delete DMLs.
ignore-insert-value-expr = "price > 1000 and origin = 'no where'" # Ignore insert DMLs that contain the conditions "price > 1000" and "origin = 'no where'".

[sink]
# For the sink of MQ type, you can use dispatchers to configure the event dispatcher.
# Since v6.1, TiDB supports two types of event dispatchers: partition and topic. For more information, see the following section.
# The matching syntax of matcher is the same as the filter rule syntax. For details about the matcher rules, see the following section.

dispatchers = [
    {matcher = ['test1.*', 'test2.*'], topic = "Topic expression 1", partition = "ts" },
    {matcher = ['test3.*', 'test4.*'], topic = "Topic expression 2", partition = "index-value" },
    {matcher = ['test1.*', 'test5.*'], topic = "Topic expression 3", partition = "table"},
    {matcher = ['test6.*'], partition = "ts"}
]
# For the sink of MQ type, you can specify the protocol format of the message.
# Currently the following protocols are supported: canal-json, open-protocol, canal, avro, and maxwell.
protocol = "canal-json"

Event filter rules New in v6.2.0

Starting in v6.2.0, TiCDC supports event filter. You can configure event filter rules to filter out the DML and DDL events that meet the specified conditions.

The following is an example of event filter rules:

[filter]
# The event filter rules must be under the `[filter]` configuration. You can configure multiple event filters at the same time.

[[filter.event-filters]]
matcher = ["test.worker"] # matcher is an allow list, which means this rule only applies to the worker table in the test database.
ignore-event = ["insert"] # Ignore insert events.
ignore-sql = ["^drop", "add column"] # Ignore DDLs that start with "drop" or contain "add column".
ignore-delete-value-expr = "name = 'john'" # Ignore delete DMLs that contain the condition "name = 'john'".
ignore-insert-value-expr = "id >= 100" # Ignore insert DMLs that contain the condition "id >= 100".
ignore-update-old-value-expr = "age < 18 or name = 'lili'" # Ignore update DMLs whose old value contains "age < 18" or "name = 'lili'".
ignore-update-new-value-expr = "gender = 'male' and age > 18" # Ignore update DMLs whose new value contains "gender = 'male'" and "age > 18".

The event filter rules must be under the [filter] configuration. For detailed configuration, refer to Task configuration file.

Description of configuration parameters :

  • matcher: the database and table that this event filter rule applies to. The syntax is the same as table filter.
  • ignore-event: the event type to be ignored. This parameter accepts an array of strings. You can configure multiple event types. Currently, the following event types are supported:
EventTypeAliasDescription
all dmlMatches all DML events
all ddlMatches all DDL events
insertDMLMatches insert DML event
updateDMLMatches update DML event
deleteDMLMatches delete DML event
create schemaDDLcreate databaseMatches create database event
drop schemaDDLdrop databaseMatches drop database event
create tableDDLMatches create table event
drop tableDDLMatches drop table event
rename tableDDLMatches rename table event
truncate tableDDLMatches truncate table event
alter tableDDLMatches alter table event, including all clauses of alter table, create index and drop index
add table partitionDDLMatches add table partition event
drop table partitionDDLMatches drop table partition event
truncate table partitionDDLMatches truncate table partition event
create viewDDLMatches create viewevent
drop viewDDLMatches drop view event
  • ignore-sql: the DDL statements to be ignored. This parameter accepts an array of strings, in which you can configure multiple regular expressions. This rule only applies to DDL events.
  • ignore-delete-value-expr: this parameter accepts a SQL expression. This rule only applies to delete DML events with the specified value.
  • ignore-insert-value-expr: this parameter accepts a SQL expression. This rule only applies to insert DML events with the specified value.
  • ignore-update-old-value-expr: this parameter accepts a SQL expression. This rule only applies to update DML events whose old value contains the specified value.
  • ignore-update-new-value-expr: this parameter accepts a SQL expression. This rule only applies to update DML events whose new value contains the specified value.
Note
  • When TiDB updates a value in the column of the clustered index, TiDB splits an UPDATE event into a DELETE event and an INSERT event. TiCDC does not identify such events as an UPDATE event and thus cannot correctly filter out such events.
  • When you configure a SQL expression, make sure all tables that matches matcher contain all the columns specified in the SQL expression. Otherwise, the replication task cannot be created. In addition, if the table schema changes during the replication, which results in a table no longer containing a required column, the replication task fails and cannot be resumed automatically. In such a situation, you must manually modify the configuration and resume the task.

Notes for compatibility

  • In TiCDC v4.0.0, ignore-txn-commit-ts is removed and ignore-txn-start-ts is added, which uses start_ts to filter transactions.
  • In TiCDC v4.0.2, db-dbs/db-tables/ignore-dbs/ignore-tables are removed and rules is added, which uses new filter rules for databases and tables. For detailed filter syntax, see Table Filter.
  • In TiCDC v6.1.0, mounter is removed. If you configure mounter, TiCDC does not report an error, but the configuration does not take effect.

Customize the rules for Topic and Partition dispatchers of Kafka Sink

Matcher rules

In the example of the previous section:

  • For the tables that match the matcher rule, they are dispatched according to the policy specified by the corresponding topic expression. For example, the test3.aa table is dispatched according to "Topic expression 2"; the test5.aa table is dispatched according to "Topic expression 3".
  • For a table that matches multiple matcher rules, it is dispatched according to the first matching topic expression. For example, the test1.aa table is distributed according to "Topic expression 1".
  • For tables that do not match any matcher rule, the corresponding data change events are sent to the default topic specified in --sink-uri. For example, the test10.aa table is sent to the default topic.
  • For tables that match the matcher rule but do not specify a topic dispatcher, the corresponding data changes are sent to the default topic specified in --sink-uri. For example, the test6.aa table is sent to the default topic.

Topic dispatchers

You can use topic = "xxx" to specify a Topic dispatcher and use topic expressions to implement flexible topic dispatching policies. It is recommended that the total number of topics be less than 1000.

The format of the Topic expression is [prefix]{schema}[middle][{table}][suffix].

  • prefix: optional. Indicates the prefix of the Topic Name.
  • {schema}: required. Used to match the schema name.
  • middle: optional. Indicates the delimiter between schema name and table name.
  • {table}: optional. Used to match the table name.
  • suffix: optional. Indicates the suffix of the Topic Name.

prefix, middle and suffix can only include the following characters: a-z, A-Z, 0-9, ., _ and -. {schema} and {table} are both lowercase. Placeholders such as {Schema} and {TABLE} are invalid.

Some examples:

  • matcher = ['test1.table1', 'test2.table2'], topic = "hello_{schema}_{table}"
    • The data change events corresponding to test1.table1 are sent to the topic named hello_test1_table1.
    • The data change events corresponding to test2.table2 are sent to the topic named hello_test2_table2.
  • matcher = ['test3.*', 'test4.*'], topic = "hello_{schema}_world"
    • The data change events corresponding to all tables in test3 are sent to the topic named hello_test3_world.
    • The data change events corresponding to all tables in test4 are sent to the topic named hello_test4_world.
  • matcher = ['*.*'], topic = "{schema}_{table}"
    • All tables listened by TiCDC are dispatched to separate topics according to the "schema_table" rule. For example, for the test.account table, TiCDC dispatches its data change log to a Topic named test_account.

Dispatch DDL events

Schema-level DDLs

DDLs that are not related to a specific table are called schema-level DDLs, such as create database and drop database. The events corresponding to schema-level DDLs are sent to the default topic specified in --sink-uri.

Table-level DDLs

DDLs that are related to a specific table are called table-level DDLs, such as alter table and create table. The events corresponding to table-level DDLs are sent to the corresponding topic according to dispatcher configurations.

For example, for a dispatcher like matcher = ['test.*'], topic = {schema}_{table}, DDL events are dispatched as follows:

  • If a single table is involved in the DDL event, the DDL event is sent to the corresponding topic as is. For example, for the DDL event drop table test.table1, the event is sent to the topic named test_table1.
  • If multiple tables are involved in the DDL event (rename table / drop table / drop view may involve multiple tables), the DDL event is split into multiple events and sent to the corresponding topics. For example, for the DDL event rename table test.table1 to test.table10, test.table2 to test.table20, the event rename table test.table1 to test.table10 is sent to the topic named test_table1 and the event rename table test.table2 to test.table20 is sent to the topic named test.table2.

Partition dispatchers

You can use partition = "xxx" to specify a partition dispatcher. It supports four dispatchers: default, ts, index-value, and table. The dispatcher rules are as follows:

  • default: When multiple unique indexes (including the primary key) exist or the Old Value feature is enabled, events are dispatched in the table mode. When only one unique index (or the primary key) exists, events are dispatched in the index-value mode.
  • ts: Use the commitTs of the row change to hash and dispatch events.
  • index-value: Use the value of the primary key or the unique index of the table to hash and dispatch events.
  • table: Use the schema name of the table and the table name to hash and dispatch events.
Note

Since v6.1, to clarify the meaning of the configuration, the configuration used to specify the partition dispatcher has been changed from dispatcher to partition, with partition being an alias for dispatcher. For example, the following two rules are exactly equivalent.

[sink]
dispatchers = [
   {matcher = ['*.*'], dispatcher = "ts"},
   {matcher = ['*.*'], partition = "ts"},
]

However, dispatcher and partition cannot appear in the same rule. For example, the following rule is invalid.

{matcher = ['*.*'], dispatcher = "ts", partition = "table"},

Output the historical value of a Row Changed Event New in v4.0.5

In the default configuration, the Row Changed Event of TiCDC Open Protocol output in a replication task only contains the changed value, not the value before the change. Therefore, the output value cannot be used by the consumer ends of TiCDC Open Protocol as the historical value of a Row Changed Event.

Starting from v4.0.5, TiCDC supports outputting the historical value of a Row Changed Event. To enable this feature, specify the following configuration in the changefeed configuration file at the root level:

enable-old-value = true

This feature is enabled by default since v5.0. To learn the output format of the TiCDC Open Protocol after this feature is enabled, see TiCDC Open Protocol - Row Changed Event.

Replicate tables with the new framework for collations enabled

Starting from v4.0.15, v5.0.4, v5.1.1 and v5.2.0, TiCDC supports tables that have enabled new framework for collations.

Replicate tables without a valid index

Since v4.0.8, TiCDC supports replicating tables that have no valid index by modifying the task configuration. To enable this feature, configure in the changefeed configuration file as follows:

enable-old-value = true
force-replicate = true
Warning

For tables without a valid index, operations such as INSERT and REPLACE are not reentrant, so there is a risk of data redundancy. TiCDC guarantees that data is distributed only at least once during the replication process. Therefore, enabling this feature to replicate tables without a valid index will definitely cause data redundancy. If you do not accept data redundancy, it is recommended to add an effective index, such as adding a primary key column with the AUTO RANDOM attribute.

Unified Sorter

Unified sorter is the sorting engine in TiCDC. It can mitigate OOM problems caused by the following scenarios:

  • The data replication task in TiCDC is paused for a long time, during which a large amount of incremental data is accumulated and needs to be replicated.
  • The data replication task is started from an early timestamp so it becomes necessary to replicate a large amount of incremental data.

For the changefeeds created using cdc cli after v4.0.13, Unified Sorter is enabled by default; for the changefeeds that have existed before v4.0.13, the previous configuration is used.

To check whether or not the Unified Sorter feature is enabled on a changefeed, you can execute the following example command (assuming the IP address of the PD instance is http://10.0.10.25:2379):

cdc cli --pd="http://10.0.10.25:2379" changefeed query --changefeed-id=simple-replication-task | grep 'sort-engine'

In the output of the above command, if the value of sort-engine is "unified", it means that Unified Sorter is enabled on the changefeed.

Note
  • If your servers use mechanical hard drives or other storage devices that have high latency or limited bandwidth, use the unified sorter with caution.
  • By default, Unified Sorter uses data_dir to store temporary files. It is recommended to ensure that the free disk space is greater than or equal to 500 GiB. For production environments, it is recommended to ensure that the free disk space on each node is greater than (the maximum checkpoint-ts delay allowed by the business) * (upstream write traffic at business peak hours). In addition, if you plan to replicate a large amount of historical data after changefeed is created, make sure that the free space on each node is greater than the amount of replicated data.
  • Unified sorter is enabled by default. If your servers do not match the above requirements and you want to disable the unified sorter, you need to manually set sort-engine to memory for the changefeed.
  • To enable Unified Sorter on an existing changefeed that uses memory to sort, see the methods provided in How do I handle the OOM that occurs after TiCDC is restarted after a task interruption?.

Eventually consistent replication in disaster scenarios

Warning

Currently, it is not recommended that you use eventually consistent replication in disaster scenarios. For details, see critical bug #6189.

Starting from v5.3.0, TiCDC supports backing up incremental data from an upstream TiDB cluster to S3 storage or an NFS file system of a downstream cluster. When the upstream cluster encounters a disaster and becomes unavailable, TiCDC can restore the downstream data to the recent eventually consistent state. This is the eventually consistent replication capability provided by TiCDC. With this capability, you can switch applications to the downstream cluster quickly, avoiding long-time downtime and improving service continuity.

Currently, TiCDC can replicate incremental data from a TiDB cluster to another TiDB cluster or a MySQL-compatible database system (including Aurora, MySQL, and MariaDB). In case the upstream cluster crashes, TiCDC can restore data in the downstream cluster within 5 minutes, given the conditions that before the disaster the replication status of TiCDC is normal and the replication lag is small. It allows data loss of 10s at most, that is, RTO <= 5 min, and P95 RPO <= 10s.

TiCDC replication lag increases in the following scenarios:

  • The TPS increases significantly in a short time
  • Large or long transactions occur in the upstream
  • The TiKV or TiCDC cluster in the upstream is reloaded or upgraded
  • Time-consuming DDL statements, such as add index, are executed in the upstream
  • The PD is configured with aggressive scheduling strategies, resulting in frequent transfer of Region leaders, or frequent Region merge or Region split

Prerequisites

  • Prepare a highly available Amazon S3 storage or NFS system for storing TiCDC's real-time incremental data backup files. These files can be accessed in case of an primary cluster disaster.
  • Enable this feature for changefeeds that need to have eventual consistency in disaster scenarios. To enable it, you can add the following configuration to the changefeed configuration file.
[consistent]
# Consistency level. Options include:
# - none: the default value. In a non-disaster scenario, eventual consistency is only guaranteed if and only if finished-ts is specified.
# - eventual: Uses redo log to guarantee eventual consistency in case of the primary cluster disasters.
level = "eventual"

# Individual redo log file size, in MiB. By default, it's 64. It is recommended to be no more than 128.
max-log-size = 64

# The interval for flushing or uploading redo logs to S3, in milliseconds. By default, it's 1000. The recommended range is 500-2000.
flush-interval = 1000

# Form of storing redo log, including nfs (NFS directory) and S3 (uploading to S3).
storage = "s3://logbucket/test-changefeed?endpoint=http://$S3_ENDPOINT/"

Disaster recovery

When a disaster happens in the primary cluster, you need to recover manually in the secondary cluster by running the cdc redo command. The recovery process is as follows.

  1. Ensure that all the TiCDC processes have exited. This is to prevent the primary cluster from resuming service during data recovery and prevent TiCDC from restarting data synchronization.
  2. Use cdc binary for data recovery. Run the following command:
cdc redo apply --tmp-dir="/tmp/cdc/redo/apply" \
    --storage="s3://logbucket/test-changefeed?endpoint=http://10.0.10.25:24927/" \
    --sink-uri="mysql://normal:123456@10.0.10.55:3306/"

In this command:

  • tmp-dir: Specifies the temporary directory for downloading TiCDC incremental data backup files.
  • storage: Specifies the address for storing the TiCDC incremental data backup files, either an Amazon S3 storage or an NFS directory.
  • sink-uri: Specifies the secondary cluster address to restore the data to. Scheme can only be mysql.
Was this page helpful?