-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
121 lines (90 loc) · 3.44 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import csv
import json
import boto.sqs
from keboola import docker
cfg = docker.Config('/data/')
parameters = cfg.get_parameters()
print("Starting process...")
SQS_AWS_REGION = parameters['SQS_AWS_REGION']
SQS_AWS_ACCESS_KEY_ID = parameters['SQS_AWS_ACCESS_KEY_ID']
SQS_AWS_SECRET_ACCESS_KEY = parameters['SQS_AWS_SECRET_ACCESS_KEY']
SQS_AWS_QUEUE_NAME = parameters['SQS_AWS_QUEUE_NAME']
sqs_conn = boto.sqs.connect_to_region(
region_name = SQS_AWS_REGION,
aws_access_key_id = SQS_AWS_ACCESS_KEY_ID,
aws_secret_access_key = SQS_AWS_SECRET_ACCESS_KEY
)
queue = sqs_conn.get_queue(queue_name=SQS_AWS_QUEUE_NAME)
csvlt = '\n'
csvdel = ','
csvquo = '"'
columns = {}
with open(parameters['INPUT_FORMAT'], mode='rt', encoding='utf-8') as in_file:
lazy_lines = (line.replace('\0', '') for line in in_file)
reader = csv.DictReader(lazy_lines, lineterminator=csvlt, delimiter=csvdel, quotechar=csvquo)
for row in reader:
columns[row['standard_attribute_name']] = {
'type': row['json_value_type'],
'format': row['format_info']
}
with open(parameters['INPUT'], mode='rt', encoding='utf-8') as in_file:
lazy_lines = (line.replace('\0', '') for line in in_file)
reader = csv.DictReader(lazy_lines, lineterminator=csvlt, delimiter=csvdel, quotechar=csvquo)
for row in reader:
data = {}
for key, value in columns.items():
current_data = None
type = value['type']
format = value['format']
try:
current_data = row[key]
except:
if type != "boolean":
continue
export_value = None
if type == "boolean":
export_value = False
try:
current_data = row[key]
except:
if type != "boolean":
continue
if current_data:
if current_data == "true":
export_value = True
data[key] = export_value
continue
if current_data:
if type == "string":
export_value = str(current_data)
if type == "object":
try:
temp_data = json.loads(current_data)
export_value = {
'id': int(temp_data['id'])
}
except:
try:
temp_data = json.loads(current_data)
export_value = temp_data
except:
export_value = {}
if type == "number":
if format == "integer":
export_value = int(float(current_data))
if format == "float":
export_value = float(current_data)
if type == "array":
temp_data = json.loads(current_data)
temp_array = []
for value in temp_data:
temp_array.append(value)
export_value = temp_array
data[key] = export_value
continue
message = json.dumps(data)
try:
sqs_conn.send_message(queue=queue, message_content=message)
except:
print("Failed to push message")
print("Job done!")