首页 问答 pyflink在读取hdfs文件的时候如何使用通配符?
问题详情

def read_csv_file_example(input_path):
env = StreamExecutionEnvironment.get_execution_environment()

# env.set_parallelism(2)
settings = EnvironmentSettings.new_instance().in_streaming_mode().build() # in_streaming_mode in_batch_mode
t_env = StreamTableEnvironment.create(env, settings)
# 读取 csv 文件
input_descriptor = TableDescriptor.for_connector("filesystem") \
    .option("path", input_path) \
    .format("csv") \
    .option("csv.ignore-parse-errors", "true") \
    .option("csv.field-delimiter", ",") \
    .schema(
    Schema.new_builder()
        .column("device_id", DataTypes.STRING())
        .column("user_id", DataTypes.STRING())
        .column("event_time", DataTypes.TIMESTAMP(3))
        .watermark("event_time", "event_time - INTERVAL '5' SECOND")  # 定义水印生成规则:每个事件的水印 = 该事件的事件时间 - 5 秒
        .build()
).build()
# 注册为临时表
t_env.create_temporary_table('device_events', input_descriptor)
# 创建 Table 对象
csv_table = t_env.from_path('device_events')
# 查看 Schema
csv_table.print_schema()
'''
(
  `device_id` STRING,
  `user_id` STRING,
  `event_time` TIMESTAMP(3) *ROWTIME*,
  WATERMARK FOR `event_time`: TIMESTAMP(3) AS event_time - INTERVAL '5' SECOND
)
'''
# 执行查询
csv_table.limit(20).execute().print()

if name == 'main':
inputpath = "hdfs://10.130.33.76:9000/flink/data2/input*.csv"
read_csv_file_example(input_path)

版权:言论仅代表个人观点,不代表官方立场。转载请注明出处:https://www.stntk.com/question/2328.html

发表评论
暂无评论

还没有评论呢,快来抢沙发~

点击联系客服

在线时间:8:00-16:00

客服QQ

70068002

客服电话

400-888-8888

客服邮箱

70068002@qq.com

扫描二维码

关注微信公众号

扫描二维码

手机访问本站