Skip to content

Commit

Permalink
Unit test to reproduce the issue
Browse files Browse the repository at this point in the history
We expect both sides of the upper-level 'DataStreamGroupWindowAggregate'
to have different sliding windows (6hrs and 2hrs),
but the planner gives a plan where both sides have the same sliding
window.
  • Loading branch information
Benoit Hanotte committed Jan 13, 2020
1 parent d76f216 commit 3d388f1
Showing 1 changed file with 115 additions and 0 deletions.
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.plan

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.Types
import org.apache.flink.table.api.scala._
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
import org.apache.flink.table.utils.TableTestBase
import org.apache.flink.types.Row
import org.junit.Test

class UnionWindowsTest extends TableTestBase {

@Test
def testUnionDifferentWindows(): Unit = {
val tablesTypeInfo = Types.ROW(
Array("timestamp"),
Array[TypeInformation[_]](TimeIndicatorTypeInfo.ROWTIME_INDICATOR)
)

val util = streamTestUtil()
util.addTable[Row]("table_display", 'timestamp)(tablesTypeInfo)
util.addTable[Row]("table_click", 'timestamp)(tablesTypeInfo)


// The SQL is a bit complex, but it seems that removing the lower-levels UNION between clicks and display makes the
// problem disappear.
val sqlQuery =
"""
|WITH displays AS (
| SELECT `timestamp`, 1 as nb_displays, 0 as nb_clicks FROM table_display
|),
|
|clicks AS (
| SELECT `timestamp`, 0 as nb_displays, 1 as nb_clicks FROM table_click
|),
|
|counts_2h AS (
| SELECT
| SUM(nb_clicks) / SUM(nb_displays) as ctr,
| HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) as `timestamp`
| FROM (
| (SELECT * FROM displays)
| UNION ALL
| (SELECT * FROM clicks)
| ) t
| GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
|),
|
|counts_6h AS (
| SELECT
| SUM(nb_clicks) / SUM(nb_displays) as ctr,
| HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR) as `timestamp`
| FROM (
| (SELECT * FROM displays)
| UNION ALL
| (SELECT * FROM clicks)
| ) t
| GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR)
|)
|
|SELECT
| TUMBLE_END(`timestamp`, INTERVAL '1' HOUR) as `timestamp`,
| MAX(ctr)
|FROM (
| (SELECT * FROM counts_6h)
| UNION ALL
| (SELECT * FROM counts_2h)
|) t
|GROUP BY TUMBLE(`timestamp`, INTERVAL '1' HOUR)
|""".stripMargin

// We expect both sides of the upper-level 'DataStreamGroupWindowAggregate' to have different sliding windows (6hrs and 2hrs),
// but the planner gives a plan where both sides have the same sliding window.
val expected =
"""DataStreamCalc(select=[w$end AS timestamp, CAST(EXPR$1) AS EXPR$1])
| DataStreamGroupWindowAggregate(window=[TumblingGroupWindow('w$, 'timestamp, 3600000.millis)], select=[MAX(ctr) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
| DataStreamUnion(all=[true], union all=[ctr, timestamp])
| DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime AS timestamp])
| DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 21600000.millis, 3600000.millis)], select=[SUM(nb_clicks) AS $f0, SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
| DataStreamUnion(all=[true], union all=[timestamp, nb_displays, nb_clicks])
| DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS nb_clicks])
| DataStreamScan(id=[1], fields=[timestamp])
| DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS nb_clicks])
| DataStreamScan(id=[2], fields=[timestamp])
| DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime AS timestamp])
| DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 7200000.millis, 3600000.millis)], select=[SUM(nb_clicks) AS $f0, SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
| DataStreamUnion(all=[true], union all=[timestamp, nb_displays, nb_clicks])
| DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS nb_clicks])
| DataStreamScan(id=[1], fields=[timestamp])
| DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS nb_clicks])
| DataStreamScan(id=[2], fields=[timestamp])
|""".stripMargin

util.verifySql(sqlQuery, expected)
}

}

0 comments on commit 3d388f1

Please sign in to comment.