package org.apache.flink.table.planner.runtime.stream.sql;

import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.table.api.bridge.scala.package$;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData$;
import org.apache.flink.table.planner.runtime.utils.TestingRetractSink;
import org.apache.flink.types.Row;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.Predef$;
import scala.StringContext;
import scala.collection.JavaConversions$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.math.Ordering$String$;
import scala.reflect.ScalaSignature;

/* compiled from: ChangelogSourceITCase.scala */
@RunWith(Parameterized.class)
@ScalaSignature(bytes = "\u0006\u0001]4A!\u0001\u0002\u0001'\t)2\t[1oO\u0016dwnZ*pkJ\u001cW-\u0013+DCN,'BA\u0002\u0005\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003\u000b\u0019\taa\u001d;sK\u0006l'BA\u0004\t\u0003\u001d\u0011XO\u001c;j[\u0016T!!\u0003\u0006\u0002\u000fAd\u0017M\u001c8fe*\u00111\u0002D\u0001\u0006i\u0006\u0014G.\u001a\u0006\u0003\u001b9\tQA\u001a7j].T!a\u0004\t\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005\t\u0012aA8sO\u000e\u00011C\u0001\u0001\u0015!\t)\u0002$D\u0001\u0017\u0015\t9b!A\u0003vi&d7/\u0003\u0002\u001a-\tQ2\u000b\u001e:fC6LgnZ,ji\"\u001cF/\u0019;f)\u0016\u001cHOQ1tK\"A1\u0004\u0001B\u0001B\u0003%A$A\u0003ti\u0006$X\r\u0005\u0002\u001ec9\u0011ad\f\b\u0003?9r!\u0001I\u0017\u000f\u0005\u0005bcB\u0001\u0012,\u001d\t\u0019#F\u0004\u0002%S9\u0011Q\u0005K\u0007\u0002M)\u0011qEE\u0001\u0007yI|w\u000e\u001e \n\u0003EI!a\u0004\t\n\u00055q\u0011BA\u0006\r\u0013\tI!\"\u0003\u0002\b\u0011%\u0011qCB\u0005\u0003aY\t!d\u0015;sK\u0006l\u0017N\\4XSRD7\u000b^1uKR+7\u000f\u001e\"bg\u0016L!AM\u001a\u0003!M#\u0018\r^3CC\u000e\\WM\u001c3N_\u0012,'B\u0001\u0019\u0017\u0011\u0015)\u0004\u0001\"\u00017\u0003\u0019a\u0014N\\5u}Q\u0011q'\u000f\t\u0003q\u0001i\u0011A\u0001\u0005\u00067Q\u0002\r\u0001\b\u0005\bw\u0001\u0011\r\u0011\"\u0001=\u0003\u0019!\u0017\r^1JIV\tQ\b\u0005\u0002?\t:\u0011qHQ\u0007\u0002\u0001*\t\u0011)A\u0003tG\u0006d\u0017-\u0003\u0002D\u0001\u00061\u0001K]3eK\u001aL!!\u0012$\u0003\rM#(/\u001b8h\u0015\t\u0019\u0005\t\u0003\u0004I\u0001\u0001\u0006I!P\u0001\bI\u0006$\u0018-\u00133!\u0011\u0015Q\u0005\u0001\"\u0011L\u0003\u0019\u0011WMZ8sKR\tA\n\u0005\u0002@\u001b&\u0011a\n\u0011\u0002\u0005+:LG\u000f\u000b\u0002J!B\u0011\u0011\u000bV\u0007\u0002%*\u00111\u000bE\u0001\u0006UVt\u0017\u000e^\u0005\u0003+J\u0013aAQ3g_J,\u0007\"B,\u0001\t\u0003Y\u0015!\n;fgR\u001c\u0005.\u00198hK2|wmU8ve\u000e,\u0017I\u001c3U_J+GO]1diN#(/Z1nQ\t1\u0016\f\u0005\u0002R5&\u00111L\u0015\u0002\u0005)\u0016\u001cH\u000fC\u0003^\u0001\u0011\u00051*\u0001\u0011uKN$8\t[1oO\u0016dwnZ*pkJ\u001cW-\u00118e+B\u001cXM\u001d;TS:\\\u0007F\u0001/Z\u0011\u0015\u0001\u0007\u0001\"\u0001L\u0003y!Xm\u001d;BO\u001e\u0014XmZ1uK>s7\t[1oO\u0016dwnZ*pkJ\u001cW\r\u000b\u0002`3\")1\r\u0001C\u0001\u0017\u0006YC/Z:u\u0003\u001e<'/Z4bi\u0016|en\u00115b]\u001e,Gn\\4T_V\u00148-Z!oIV\u00038/\u001a:u'&t7\u000e\u000b\u0002c3\")a\r\u0001C\u0001\u0017\u0006QC/Z:u\u0003\u001e<'/Z4bi\u0016|e.\u00138tKJ$H)\u001a7fi\u0016\u001c\u0005.\u00198hK2|wmU8ve\u000e,\u0007FA3ZQ\u0011\u0001\u0011n\u001c9\u0011\u0005)lW\"A6\u000b\u00051\u0014\u0016A\u0002:v]:,'/\u0003\u0002oW\n9!+\u001e8XSRD\u0017!\u0002<bYV,7%A9\u0011\u0005I,X\"A:\u000b\u0005Q\u0014\u0016a\u0002:v]:,'o]\u0005\u0003mN\u0014Q\u0002U1sC6,G/\u001a:ju\u0016$\u0007")
/* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/ChangelogSourceITCase.class */
public class ChangelogSourceITCase extends StreamingWithStateTestBase {
    private final String dataId;

    public String dataId() {
        return this.dataId;
    }

    @Override // org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase, org.apache.flink.table.planner.runtime.utils.StreamingTestBase
    @Before
    public void before() {
        super.before();
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE user_logs (\n         |  user_id STRING,\n         |  user_name STRING,\n         |  email STRING,\n         |  balance DECIMAL(18,2),\n         |  balance2 AS balance * 2\n         |) WITH (\n         | 'connector' = 'values',\n         | 'data-id' = '", "',\n         | 'changelog-mode' = 'I,UA,UB,D'\n         |)\n         |"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{dataId()})))).stripMargin());
    }

    @Test
    public void testChangelogSourceAndToRetractStream() {
        DataStream retractStream = package$.MODULE$.tableConversions(tEnv().sqlQuery("SELECT * FROM user_logs")).toRetractStream(TypeExtractor.createTypeInfo(Row.class));
        TestingRetractSink testingRetractSink = new TestingRetractSink();
        retractStream.addSink(testingRetractSink).setParallelism(retractStream.parallelism());
        env().execute();
        Assert.assertEquals(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"user1,Tom,tom123@gmail.com,8.10,16.20", "user3,Bailey,bailey@qq.com,9.99,19.98", "user4,Tina,tina@gmail.com,11.30,22.60"})).sorted(Ordering$String$.MODULE$), testingRetractSink.getRetractResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testChangelogSourceAndUpsertSink() {
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE user_sink (\n         |  user_id STRING PRIMARY KEY NOT ENFORCED,\n         |  user_name STRING,\n         |  email STRING,\n         |  balance DECIMAL(18,2),\n         |  balance2 DECIMAL(18,2)\n         |) WITH (\n         | 'connector' = 'values',\n         | 'sink-insert-only' = 'false'\n         |)\n         |"})).s(Nil$.MODULE$))).stripMargin();
        String stripMargin2 = new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |INSERT INTO user_sink\n         |SELECT * FROM user_logs\n         |"})).s(Nil$.MODULE$))).stripMargin();
        tEnv().executeSql(stripMargin);
        execInsertSqlAndWaitResult(stripMargin2);
        Assert.assertEquals(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"user1,Tom,tom123@gmail.com,8.10,16.20", "user3,Bailey,bailey@qq.com,9.99,19.98", "user4,Tina,tina@gmail.com,11.30,22.60"})).sorted(Ordering$String$.MODULE$), JavaConversions$.MODULE$.asScalaBuffer(TestValuesTableFactory.getResults("user_sink")).sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testAggregateOnChangelogSource() {
        DataStream retractStream = package$.MODULE$.tableConversions(tEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |SELECT count(*), sum(balance), max(email)\n         |FROM user_logs\n         |"})).s(Nil$.MODULE$))).stripMargin())).toRetractStream(TypeExtractor.createTypeInfo(Row.class));
        TestingRetractSink testingRetractSink = new TestingRetractSink();
        retractStream.addSink(testingRetractSink).setParallelism(retractStream.parallelism());
        env().execute();
        Assert.assertEquals(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"3,29.39,tom123@gmail.com"})).sorted(Ordering$String$.MODULE$), testingRetractSink.getRetractResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testAggregateOnChangelogSourceAndUpsertSink() {
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE user_sink (\n         |  `scope` STRING,\n         |  cnt BIGINT,\n         |  sum_balance DECIMAL(18,2),\n         |  max_email STRING,\n         |  PRIMARY KEY (`scope`) NOT ENFORCED\n         |) WITH (\n         | 'connector' = 'values',\n         | 'sink-insert-only' = 'false'\n         |)\n         |"})).s(Nil$.MODULE$))).stripMargin();
        String stripMargin2 = new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |INSERT INTO user_sink\n         |SELECT 'ALL', count(*), sum(balance), max(email)\n         |FROM user_logs\n         |GROUP BY 'ALL'\n         |"})).s(Nil$.MODULE$))).stripMargin();
        tEnv().executeSql(stripMargin);
        execInsertSqlAndWaitResult(stripMargin2);
        Assert.assertEquals(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"ALL,3,29.39,tom123@gmail.com"})).sorted(Ordering$String$.MODULE$), JavaConversions$.MODULE$.asScalaBuffer(TestValuesTableFactory.getResults("user_sink")).sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testAggregateOnInsertDeleteChangelogSource() {
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE user_logs2 (\n         |  user_id STRING,\n         |  user_name STRING,\n         |  email STRING,\n         |  balance DECIMAL(18,2)\n         |) WITH (\n         | 'connector' = 'values',\n         | 'data-id' = '", "',\n         | 'changelog-mode' = 'I,D'\n         |)\n         |"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{TestValuesTableFactory.registerData((Seq<Row>) TestData$.MODULE$.userChangelog().map(new ChangelogSourceITCase$$anonfun$1(this), Seq$.MODULE$.canBuildFrom()))})))).stripMargin());
        DataStream retractStream = package$.MODULE$.tableConversions(tEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |SELECT count(*), sum(balance), max(email)\n         |FROM user_logs2\n         |"})).s(Nil$.MODULE$))).stripMargin())).toRetractStream(TypeExtractor.createTypeInfo(Row.class));
        TestingRetractSink testingRetractSink = new TestingRetractSink();
        retractStream.addSink(testingRetractSink).setParallelism(retractStream.parallelism());
        env().execute();
        Assert.assertEquals(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"3,29.39,tom123@gmail.com"})).sorted(Ordering$String$.MODULE$), testingRetractSink.getRetractResults().sorted(Ordering$String$.MODULE$));
    }

    public ChangelogSourceITCase(StreamingWithStateTestBase.StateBackendMode stateBackendMode) {
        super(stateBackendMode);
        this.dataId = TestValuesTableFactory.registerData(TestData$.MODULE$.userChangelog());
    }
}
