package org.apache.flink.table.planner.plan.stream.sql.join;

import java.util.Collection;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.SqlParserException;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.package$;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.plan.utils.InvalidAsyncTableFunctionEvalSignature1;
import org.apache.flink.table.planner.plan.utils.InvalidAsyncTableFunctionEvalSignature2;
import org.apache.flink.table.planner.plan.utils.InvalidAsyncTableFunctionEvalSignature3;
import org.apache.flink.table.planner.plan.utils.InvalidAsyncTableFunctionResultType;
import org.apache.flink.table.planner.plan.utils.InvalidTableFunctionEvalSignature1;
import org.apache.flink.table.planner.plan.utils.InvalidTableFunctionResultType;
import org.apache.flink.table.planner.plan.utils.ValidAsyncTableFunction;
import org.apache.flink.table.planner.plan.utils.ValidTableFunction;
import org.apache.flink.table.planner.plan.utils.ValidTableFunction2;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.MatchError;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Symbol;
import scala.Symbol$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

/* compiled from: LookupJoinTest.scala */
@RunWith(Parameterized.class)
@ScalaSignature(bytes = "\u0006\u0001\u0005\rh\u0001B\u0001\u0003\u0001U\u0011a\u0002T8pWV\u0004(j\\5o)\u0016\u001cHO\u0003\u0002\u0004\t\u0005!!n\\5o\u0015\t)a!A\u0002tc2T!a\u0002\u0005\u0002\rM$(/Z1n\u0015\tI!\"\u0001\u0003qY\u0006t'BA\u0006\r\u0003\u001d\u0001H.\u00198oKJT!!\u0004\b\u0002\u000bQ\f'\r\\3\u000b\u0005=\u0001\u0012!\u00024mS:\\'BA\t\u0013\u0003\u0019\t\u0007/Y2iK*\t1#A\u0002pe\u001e\u001c\u0001aE\u0002\u0001-q\u0001\"a\u0006\u000e\u000e\u0003aQ!!\u0007\u0006\u0002\u000bU$\u0018\u000e\\:\n\u0005mA\"!\u0004+bE2,G+Z:u\u0005\u0006\u001cX\r\u0005\u0002\u001eA5\taDC\u0001 \u0003\u0015\u00198-\u00197b\u0013\t\tcD\u0001\u0007TKJL\u0017\r\\5{C\ndW\r\u0003\u0005$\u0001\t\u0005\t\u0015!\u0003%\u0003EaWmZ1dsR\u000b'\r\\3T_V\u00148-\u001a\t\u0003;\u0015J!A\n\u0010\u0003\u000f\t{w\u000e\\3b]\")\u0001\u0006\u0001C\u0001S\u00051A(\u001b8jiz\"\"A\u000b\u0017\u0011\u0005-\u0002Q\"\u0001\u0002\t\u000b\r:\u0003\u0019\u0001\u0013\t\u000f9\u0002!\u0019!C\u0005_\u0005!Q\u000f^5m+\u0005\u0001\u0004CA\f2\u0013\t\u0011\u0004DA\nTiJ,\u0017-\u001c+bE2,G+Z:u+RLG\u000e\u0003\u00045\u0001\u0001\u0006I\u0001M\u0001\u0006kRLG\u000e\t\u0005\u0006m\u0001!\taN\u0001\u0007E\u00164wN]3\u0015\u0003a\u0002\"!H\u001d\n\u0005ir\"\u0001B+oSRD#!\u000e\u001f\u0011\u0005u\u0002U\"\u0001 \u000b\u0005}\u0012\u0012!\u00026v]&$\u0018BA!?\u0005\u0019\u0011UMZ8sK\")1\t\u0001C\u0001o\u0005\u0001C/Z:u\u0015>Lg.\u00138wC2LGMS8j]R+W\u000e]8sC2$\u0016M\u00197fQ\t\u0011U\t\u0005\u0002>\r&\u0011qI\u0010\u0002\u0005)\u0016\u001cH\u000fC\u0003J\u0001\u0011\u0005q'\u0001\u0012uKN$hj\u001c;ESN$\u0018N\\2u\rJ|W.\u00138K_&t7i\u001c8eSRLwN\u001c\u0015\u0003\u0011\u0016CQ\u0001\u0014\u0001\u0005\u0002]\na\u0004^3ti&sg/\u00197jI2{wn[;q)\u0006\u0014G.\u001a$v]\u000e$\u0018n\u001c8)\u0005-+\u0005\"B(\u0001\t\u00039\u0014a\u0007;fgRTu.\u001b8P]\u0012KgMZ3sK:$8*Z=UsB,7\u000f\u000b\u0002O\u000b\")!\u000b\u0001C\u0001o\u0005yB/Z:u\u0015>Lg.\u00138wC2LGMT8o)\u0016l\u0007o\u001c:bYR\u000b'\r\\3)\u0005E+\u0005\"B+\u0001\t\u00039\u0014!\u0006;fgRTu.\u001b8UK6\u0004xN]1m)\u0006\u0014G.\u001a\u0015\u0003)\u0016CQ\u0001\u0017\u0001\u0005\u0002]\n\u0011\u0004^3ti2+g\r\u001e&pS:$V-\u001c9pe\u0006dG+\u00192mK\"\u0012q+\u0012\u0005\u00067\u0002!\taN\u0001%i\u0016\u001cHOS8j]R+W\u000e]8sC2$\u0016M\u00197f/&$\bNT3ti\u0016$\u0017+^3ss\"\u0012!,\u0012\u0005\u0006=\u0002!\taN\u0001,i\u0016\u001cHOS8j]R+W\u000e]8sC2$\u0016M\u00197f/&$\b\u000e\u0015:pU\u0016\u001cG/[8o!V\u001c\b\u000eR8x]\"\u0012Q,\u0012\u0005\u0006C\u0002!\taN\u0001(i\u0016\u001cHOS8j]R+W\u000e]8sC2$\u0016M\u00197f/&$\bNR5mi\u0016\u0014\b+^:i\t><h\u000e\u000b\u0002a\u000b\")A\r\u0001C\u0001o\u0005)C/Z:u\u0015>Lg\u000eV3na>\u0014\u0018\r\u001c+bE2,w+\u001b;i\u0007\u0006d7\rU;tQ\u0012{wO\u001c\u0015\u0003G\u0016CQa\u001a\u0001\u0005\u0002]\n\u0011\u0006^3ti*{\u0017N\u001c+f[B|'/\u00197UC\ndWmV5uQ6+H\u000e^5J]\u0012,\u0007pQ8mk6t\u0007F\u00014F\u0011\u0015Q\u0007\u0001\"\u00018\u0003i!Xm\u001d;Bm>LG-Q4he\u0016<\u0017\r^3QkNDGi\\<oQ\tIW\tC\u0003n\u0001\u0011\u0005q'\u0001\u0014uKN$(j\\5o)\u0016l\u0007o\u001c:bYR\u000b'\r\\3XSRDGK];f\u0007>tG-\u001b;j_:D#\u0001\\#\t\u000bA\u0004A\u0011A\u001c\u0002kQ,7\u000f\u001e&pS:$V-\u001c9pe\u0006dG+\u00192mK^KG\u000f\u001b$v]\u000e$\u0018n\u001c8B]\u0012\u001cuN\\:uC:$8i\u001c8eSRLwN\u001c\u0015\u0003_\u0016CQa\u001d\u0001\u0005\u0002]\n!\b^3ti*{\u0017N\u001c+f[B|'/\u00197UC\ndWmV5uQ6+H\u000e^5Gk:\u001cG/[8o\u0003:$7i\u001c8ti\u0006tGoQ8oI&$\u0018n\u001c8)\u0005I,\u0005\"\u0002<\u0001\t\u00039\u0014A\u000e;fgRTu.\u001b8UK6\u0004xN]1m)\u0006\u0014G.Z,ji\"4UO\\2uS>t\u0017I\u001c3SK\u001a,'/\u001a8dK\u000e{g\u000eZ5uS>t\u0007FA;F\u0011\u0015I\b\u0001\"\u0003{\u0003E\u0019'/Z1uK2{wn[;q)\u0006\u0014G.\u001a\u000b\u0005qm\fI\u0001C\u0003}q\u0002\u0007Q0A\u0005uC\ndWMT1nKB\u0019a0a\u0001\u000f\u0005uy\u0018bAA\u0001=\u00051\u0001K]3eK\u001aLA!!\u0002\u0002\b\t11\u000b\u001e:j]\u001eT1!!\u0001\u001f\u0011\u001d\tY\u0001\u001fa\u0001\u0003\u001b\ta\u0002\\8pWV\u0004h)\u001e8di&|g\u000e\u0005\u0003\u0002\u0010\u0005UQBAA\t\u0015\r\t\u0019\u0002D\u0001\nMVt7\r^5p]NLA!a\u0006\u0002\u0012\t\u0019Rk]3s\t\u00164\u0017N\\3e\rVt7\r^5p]\"9\u00111\u0004\u0001\u0005\n\u0005u\u0011!F3ya\u0016\u001cG/\u0012=dKB$\u0018n\u001c8UQJ|wO\u001c\u000b\bq\u0005}\u0011\u0011EA\u0013\u0011\u0019)\u0011\u0011\u0004a\u0001{\"9\u00111EA\r\u0001\u0004i\u0018\u0001C6fs^|'\u000fZ:\t\u0015\u0005\u001d\u0012\u0011\u0004I\u0001\u0002\u0004\tI#A\u0003dY\u0006T(\u0010\r\u0003\u0002,\u0005U\u0002#\u0002@\u0002.\u0005E\u0012\u0002BA\u0018\u0003\u000f\u0011Qa\u00117bgN\u0004B!a\r\u000261\u0001A\u0001DA\u001c\u0003K\t\t\u0011!A\u0003\u0002\u0005e\"aA0%cE!\u00111HA!!\ri\u0012QH\u0005\u0004\u0003\u007fq\"a\u0002(pi\"Lgn\u001a\t\u0005\u0003\u0007\n\u0019F\u0004\u0003\u0002F\u0005=c\u0002BA$\u0003\u001bj!!!\u0013\u000b\u0007\u0005-C#\u0001\u0004=e>|GOP\u0005\u0002?%\u0019\u0011\u0011\u000b\u0010\u0002\u000fA\f7m[1hK&!\u0011QKA,\u0005%!\u0006N]8xC\ndWMC\u0002\u0002RyAq!a\u0017\u0001\t\u0013\ti&\u0001\rwKJLg-\u001f+sC:\u001cH.\u0019;j_:\u001cVoY2fgN$2\u0001OA0\u0011\u0019)\u0011\u0011\fa\u0001{\"I\u00111\r\u0001\u0012\u0002\u0013%\u0011QM\u0001 Kb\u0004Xm\u0019;Fq\u000e,\u0007\u000f^5p]RC'o\\<oI\u0011,g-Y;mi\u0012\u001aTCAA4a\u0011\tI'!\u001c\u0011\u000by\fi#a\u001b\u0011\t\u0005M\u0012Q\u000e\u0003\r\u0003o\t\t'!A\u0001\u0002\u000b\u0005\u0011\u0011\b\u0015\b\u0001\u0005E\u0014QPA@!\u0011\t\u0019(!\u001f\u000e\u0005\u0005U$bAA<}\u00051!/\u001e8oKJLA!a\u001f\u0002v\t9!+\u001e8XSRD\u0017!\u0002<bYV,7EAAA!\u0011\t\u0019)!#\u000e\u0005\u0005\u0015%bAAD}\u00059!/\u001e8oKJ\u001c\u0018\u0002BAF\u0003\u000b\u0013Q\u0002U1sC6,G/\u001a:ju\u0016$waBAH\u0005!\u0005\u0011\u0011S\u0001\u000f\u0019>|7.\u001e9K_&tG+Z:u!\rY\u00131\u0013\u0004\u0007\u0003\tA\t!!&\u0014\u000b\u0005M\u0015q\u0013\u000f\u0011\u0007u\tI*C\u0002\u0002\u001cz\u0011a!\u00118z%\u00164\u0007b\u0002\u0015\u0002\u0014\u0012\u0005\u0011q\u0014\u000b\u0003\u0003#C\u0001\"a)\u0002\u0014\u0012\u0005\u0011QU\u0001\u000ba\u0006\u0014\u0018-\\3uKJ\u001cHCAAT!\u0019\tI+!-\u000266\u0011\u00111\u0016\u0006\u0004]\u00055&BAAX\u0003\u0011Q\u0017M^1\n\t\u0005M\u00161\u0016\u0002\u000b\u0007>dG.Z2uS>t\u0007#B\u000f\u00028\u0006m\u0016bAA]=\t)\u0011I\u001d:bsB!\u0011QXAb\u001b\t\tyL\u0003\u0003\u0002B\u00065\u0016\u0001\u00027b]\u001eLA!!2\u0002@\n1qJ\u00196fGRD\u0003\"!)\u0002J\u0006]\u0017\u0011\u001c\t\u0005\u0003\u0017\f\tN\u0004\u0003\u0002\u0004\u00065\u0017\u0002BAh\u0003\u000b\u000bQ\u0002U1sC6,G/\u001a:ju\u0016$\u0017\u0002BAj\u0003+\u0014!\u0002U1sC6,G/\u001a:t\u0015\u0011\ty-!\"\u0002\t9\fW.Z\u0011\u0003\u00037\fQ\u0003T3hC\u000eLH+\u00192mKN{WO]2f{m\u0004T\u0010\u0003\u0006\u0002`\u0006M\u0015\u0011!C\u0005\u0003C\f1B]3bIJ+7o\u001c7wKR\u0011\u00111\u0018")
/* loaded from: input_file:org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.class */
public class LookupJoinTest extends TableTestBase implements Serializable {
    private final boolean legacyTableSource;
    private final StreamTableTestUtil util = streamTestUtil(streamTestUtil$default$1());
    private static Symbol symbol$1 = Symbol$.MODULE$.apply("a");
    private static Symbol symbol$2 = Symbol$.MODULE$.apply("b");
    private static Symbol symbol$3 = Symbol$.MODULE$.apply("c");
    private static Symbol symbol$4 = Symbol$.MODULE$.apply("proctime");
    private static Symbol symbol$5 = Symbol$.MODULE$.apply("rowtime");
    private static Symbol symbol$6 = Symbol$.MODULE$.apply("d");
    private static Symbol symbol$7 = Symbol$.MODULE$.apply("id");
    private static Symbol symbol$8 = Symbol$.MODULE$.apply("name");
    private static Symbol symbol$9 = Symbol$.MODULE$.apply("age");
    private static Symbol symbol$10 = Symbol$.MODULE$.apply("ts");

    @Parameterized.Parameters(name = "LegacyTableSource={0}")
    public static Collection<Object[]> parameters() {
        return LookupJoinTest$.MODULE$.parameters();
    }

    private StreamTableTestUtil util() {
        return this.util;
    }

    @Before
    public void before() {
        util().addDataStream("MyTable", Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$1), package$.MODULE$.symbol2FieldExpression(symbol$2), package$.MODULE$.symbol2FieldExpression(symbol$3), (Expression) package$.MODULE$.UnresolvedFieldExpression(symbol$4).proctime(), (Expression) package$.MODULE$.UnresolvedFieldExpression(symbol$5).rowtime()}), new LookupJoinTest$$anon$5(this));
        util().addDataStream("T1", Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$1), package$.MODULE$.symbol2FieldExpression(symbol$2), package$.MODULE$.symbol2FieldExpression(symbol$3), package$.MODULE$.symbol2FieldExpression(symbol$6)}), new LookupJoinTest$$anon$6(this));
        util().addDataStream("nonTemporal", Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$7), package$.MODULE$.symbol2FieldExpression(symbol$8), package$.MODULE$.symbol2FieldExpression(symbol$9)}), new LookupJoinTest$$anon$7(this));
        if (this.legacyTableSource) {
            TestTemporalTable$.MODULE$.createTemporaryTable(util().tableEnv(), "LookupTable", TestTemporalTable$.MODULE$.createTemporaryTable$default$3());
        } else {
            util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n          |CREATE TABLE LookupTable (\n          |  `id` INT,\n          |  `name` STRING,\n          |  `age` INT\n          |) WITH (\n          |  'connector' = 'values'\n          |)\n          |")).stripMargin());
        }
    }

    @Test
    public void testJoinInvalidJoinTemporalTable() {
        expectExceptionThrown("SELECT * FROM MyTable AS T JOIN LookupTable T.proctime AS D ON T.a = D.id", "SQL parse failed", SqlParserException.class);
        expectExceptionThrown("SELECT * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.rowtime AS D ON T.a = D.id", "Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field", TableException.class);
        expectExceptionThrown("SELECT * FROM LookupTable FOR SYSTEM_TIME AS OF TIMESTAMP '2017-08-09 14:36:11'", "Cannot generate a valid execution plan for the given query", TableException.class);
        expectExceptionThrown("SELECT * FROM MyTable AS T RIGHT JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id", "Correlate has invalid join type RIGHT", AssertionError.class);
        expectExceptionThrown("SELECT * FROM MyTable AS T LEFT JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a + 1 = D.id + 2", "Temporal table join requires an equality condition on fields of table [default_catalog.default_database.LookupTable].", TableException.class);
        expectExceptionThrown("SELECT * FROM MyTable AS T LEFT JOIN LookupTable FOR SYSTEM_TIME AS OF PROCTIME() AS D ON T.a = D.id", "Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'", TableException.class);
    }

    @Test
    public void testNotDistinctFromInJoinCondition() {
        expectExceptionThrown("SELECT * FROM MyTable AS T LEFT JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a IS NOT  DISTINCT FROM D.id", "LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or alternative '(a = b) or (a IS NULL AND b IS NULL)')", TableException.class);
        expectExceptionThrown("SELECT * FROM MyTable AS T LEFT JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id OR (T.a IS NULL AND D.id IS NULL)", "LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or alternative '(a = b) or (a IS NULL AND b IS NULL)')", TableException.class);
    }

    @Test
    public void testInvalidLookupTableFunction() {
        util().addDataStream("T", Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$1), package$.MODULE$.symbol2FieldExpression(symbol$2), package$.MODULE$.symbol2FieldExpression(symbol$3), package$.MODULE$.symbol2FieldExpression(symbol$10), (Expression) package$.MODULE$.UnresolvedFieldExpression(symbol$4).proctime()}), new LookupJoinTest$$anon$8(this));
        createLookupTable("LookupTable1", new InvalidTableFunctionResultType());
        expectExceptionThrown("SELECT * FROM T JOIN LookupTable1 FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id AND T.b = D.name AND T.ts = D.ts", new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Result type of the lookup TableFunction of ", " is String type, "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.legacyTableSource ? "TableSource [TestInvalidTemporalTable(id, name, age, ts)]" : "DynamicTableSource [TestValues]"}))).append("but currently only Row and RowData are supported").toString(), TableException.class);
        createLookupTable("LookupTable2", new InvalidTableFunctionEvalSignature1());
        expectExceptionThrown("SELECT * FROM T JOIN LookupTable2 FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id AND T.b = D.name AND T.ts = D.ts", "Expected: eval(java.lang.Integer, org.apache.flink.table.data.StringData, org.apache.flink.table.data.TimestampData) \nActual: eval(java.lang.Integer, java.lang.String, java.time.LocalDateTime)", TableException.class);
        createLookupTable("LookupTable3", new ValidTableFunction());
        verifyTranslationSuccess("SELECT * FROM T JOIN LookupTable3 FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id AND T.b = D.name AND T.ts = D.ts");
        createLookupTable("LookupTable4", new ValidTableFunction2());
        verifyTranslationSuccess("SELECT * FROM T JOIN LookupTable4 FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id AND T.b = D.name AND T.ts = D.ts");
        createLookupTable("LookupTable5", new ValidAsyncTableFunction());
        verifyTranslationSuccess("SELECT * FROM T JOIN LookupTable5 FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id AND T.b = D.name AND T.ts = D.ts");
        createLookupTable("LookupTable6", new InvalidAsyncTableFunctionResultType());
        verifyTranslationSuccess("SELECT * FROM T JOIN LookupTable6 FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id AND T.b = D.name AND T.ts = D.ts");
        createLookupTable("LookupTable7", new InvalidAsyncTableFunctionEvalSignature1());
        expectExceptionThrown("SELECT * FROM T JOIN LookupTable7 FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id AND T.b = D.name AND T.ts = D.ts", "Expected: eval(java.util.concurrent.CompletableFuture, java.lang.Integer, org.apache.flink.table.data.StringData, org.apache.flink.table.data.TimestampData) \nActual: eval(java.lang.Integer, org.apache.flink.table.data.StringData, java.time.LocalDateTime)", TableException.class);
        createLookupTable("LookupTable8", new InvalidAsyncTableFunctionEvalSignature2());
        expectExceptionThrown("SELECT * FROM T JOIN LookupTable8 FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id AND T.b = D.name AND T.ts = D.ts", "Expected: eval(java.util.concurrent.CompletableFuture, java.lang.Integer, org.apache.flink.table.data.StringData, org.apache.flink.table.data.TimestampData) \nActual: eval(java.util.concurrent.CompletableFuture, java.lang.Integer, java.lang.String, java.time.LocalDateTime)", TableException.class);
        createLookupTable("LookupTable9", new ValidAsyncTableFunction());
        verifyTranslationSuccess("SELECT * FROM T JOIN LookupTable9 FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id AND T.b = D.name AND T.ts = D.ts");
        createLookupTable("LookupTable10", new InvalidAsyncTableFunctionEvalSignature3());
        expectExceptionThrown("SELECT * FROM T JOIN LookupTable10 FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id AND T.b = D.name AND T.ts = D.ts", "Expected: eval(java.util.concurrent.CompletableFuture, java.lang.Integer, org.apache.flink.table.data.StringData, org.apache.flink.table.data.TimestampData) \nActual: eval(org.apache.flink.streaming.api.functions.async.ResultFuture, java.lang.Integer, org.apache.flink.table.data.StringData, java.lang.Long)", TableException.class);
    }

    @Test
    public void testJoinOnDifferentKeyTypes() {
        thrown().expect(TableException.class);
        thrown().expectMessage("VARCHAR(2147483647) and INTEGER does not have common type now");
        util().verifyPlan("SELECT * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.b = D.id");
    }

    @Test
    public void testJoinInvalidNonTemporalTable() {
        expectExceptionThrown("SELECT * FROM MyTable AS T JOIN nonTemporal FOR SYSTEM_TIME AS OF T.rowtime AS D ON T.a = D.id", "Temporal table join only support join on a LookupTableSource", TableException.class);
    }

    @Test
    public void testJoinTemporalTable() {
        util().verifyPlan("SELECT * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
    }

    @Test
    public void testLeftJoinTemporalTable() {
        util().verifyPlan("SELECT * FROM MyTable AS T LEFT JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
    }

    @Test
    public void testJoinTemporalTableWithNestedQuery() {
        util().verifyPlan("SELECT * FROM (SELECT a, b, proctime FROM MyTable WHERE c > 1000) AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
    }

    @Test
    public void testJoinTemporalTableWithProjectionPushDown() {
        util().verifyPlan(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT T.*, D.id\n        |FROM MyTable AS T\n        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n        |ON T.a = D.id\n      ")).stripMargin());
    }

    @Test
    public void testJoinTemporalTableWithFilterPushDown() {
        util().verifyPlan(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM MyTable AS T\n        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n        |ON T.a = D.id AND D.age = 10\n        |WHERE T.c > 1000\n      ")).stripMargin());
    }

    @Test
    public void testJoinTemporalTableWithCalcPushDown() {
        util().verifyPlan(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM MyTable AS T\n        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n        |ON T.a = D.id AND D.age = 10\n        |WHERE cast(D.name as bigint) > 1000\n      ")).stripMargin());
    }

    @Test
    public void testJoinTemporalTableWithMultiIndexColumn() {
        util().verifyPlan(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM MyTable AS T\n        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n        |ON T.a = D.id AND D.age = 10 AND D.name = 'AAA'\n        |WHERE T.c > 1000\n      ")).stripMargin());
    }

    @Test
    public void testAvoidAggregatePushDown() {
        util().verifyPlan(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |SELECT b, count(a), sum(c), sum(d)\n         |FROM (", ") AS T\n         |GROUP BY b\n      "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |SELECT T.* FROM (", ") AS T\n         |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proc AS D\n         |ON T.a = D.id\n         |WHERE D.age > 10\n      "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT b, a, sum(c) c, sum(d) d, PROCTIME() as proc\n        |FROM T1\n        |GROUP BY a, b\n      ")).stripMargin()})))).stripMargin()})))).stripMargin());
    }

    @Test
    public void testJoinTemporalTableWithTrueCondition() {
        util().verifyPlan(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM MyTable AS T\n        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n        |ON true\n        |WHERE T.c > 1000\n      ")).stripMargin());
    }

    @Test
    public void testJoinTemporalTableWithFunctionAndConstantCondition() {
        util().verifyPlan(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM MyTable AS T\n        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n        |ON T.b = concat(D.name, '!') AND D.age = 11\n      ")).stripMargin());
    }

    @Test
    public void testJoinTemporalTableWithMultiFunctionAndConstantCondition() {
        util().verifyPlan(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM MyTable AS T\n        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n        |ON T.a = D.id + 1 AND T.b = concat(D.name, '!') AND D.age = 11\n      ")).stripMargin());
    }

    @Test
    public void testJoinTemporalTableWithFunctionAndReferenceCondition() {
        util().verifyPlan(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM MyTable AS T\n        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n        |ON T.a = D.id AND T.b = concat(D.name, '!')\n        |WHERE D.name LIKE 'Jack%'\n      ")).stripMargin());
    }

    private void createLookupTable(String str, UserDefinedFunction userDefinedFunction) {
        if (!this.legacyTableSource) {
            util().addTable(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n           |CREATE TABLE ", " (\n           |  `id` INT,\n           |  `name` STRING,\n           |  `age` INT,\n           |  `ts` TIMESTAMP(3)\n           |) WITH (\n           |  'connector' = 'values',\n           |  'lookup-function-class' = '", "'\n           |)\n           |"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str, userDefinedFunction.getClass().getName()})))).stripMargin());
            return;
        }
        if (userDefinedFunction instanceof TableFunction) {
            TestInvalidTemporalTable$.MODULE$.createTemporaryTable(util().tableEnv(), str, (TableFunction<?>) userDefinedFunction);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!(userDefinedFunction instanceof AsyncTableFunction)) {
                throw new MatchError(userDefinedFunction);
            }
            TestInvalidTemporalTable$.MODULE$.createTemporaryTable(util().tableEnv(), str, (AsyncTableFunction<?>) userDefinedFunction);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    private void expectExceptionThrown(String str, String str2, Class<? extends Throwable> cls) {
        BoxedUnit boxedUnit;
        try {
            verifyTranslationSuccess(str);
            Assert.fail(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Expected a ", ", but no exception is thrown."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{cls})));
        } catch (Throwable th) {
            Class<?> cls2 = th.getClass();
            if (cls2 != null ? cls2.equals(cls) : cls == null) {
                if (str2 == null) {
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    Assert.assertTrue(new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"The actual exception message \\n", "\\n"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{th.getMessage()}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"doesn't contain expected keyword \\n", "\\n"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str2}))).toString(), th.getMessage().contains(str2));
                    boxedUnit = BoxedUnit.UNIT;
                }
                return;
            }
            if (th == null) {
                throw th;
            }
            th.printStackTrace();
            Assert.fail(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Expected throw ", ", but is ", "."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{cls.getSimpleName(), th})));
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    private Class<? extends Throwable> expectExceptionThrown$default$3() {
        return ValidationException.class;
    }

    private void verifyTranslationSuccess(String str) {
        util().tableEnv().sqlQuery(str).explain(new ExplainDetail[0]);
    }

    public LookupJoinTest(boolean z) {
        this.legacyTableSource = z;
    }
}
