123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- #!/usr/bin/env python
- import argparse
- import json
- import os
- import sys
- import re
- import requests
- import shutil
- sys.path.append(os.path.join(os.path.dirname(__file__), '../libs'))
- from core.storage_client_factory import StorageClientFactory
- from core.automan_client import AutomanClient
- TEMP_DIR = '/temp'
- def get_cv_color(colors, name):
- color = colors[name]
- cv_color = (int(color[5:7], 16), int(color[3:5], 16), int(color[1:3], 16))
- return cv_color
- class AutomanArchiver(object):
- @classmethod
- def archive(cls, automan_info, archive_info):
- print(archive_info)
- annotations_dir = os.path.join(TEMP_DIR, 'Annotations')
- images_dir = os.path.join(TEMP_DIR, 'Images')
- image_annotations_dir = os.path.join(TEMP_DIR, 'Images_Annotations')
- # whether or not to write image in bag file to image files
- is_including_image = archive_info.get('include_image', False)
- max_frame = cls.__get_frame_range(
- automan_info, archive_info['project_id'], archive_info['annotation_id'])
- colors = cls.__get_annotation_color(automan_info, archive_info['project_id'])
- candidates = cls.__get_candidates(
- automan_info, archive_info['project_id'], archive_info['original_id'])
- for i in range(max_frame):
- annotation = cls.__get_annotation(
- automan_info, archive_info['project_id'], archive_info['annotation_id'], i + 1, annotations_dir)
- if is_including_image:
- for candidate in candidates:
- file_name = cls.__get_annotation_image(
- automan_info, archive_info['project_id'],
- archive_info['dataset_id'], candidate['id'], i + 1, candidate['ext'], images_dir)
- if file_name is not None:
- cls.__draw_annotation(file_name, annotation, colors, images_dir, image_annotations_dir)
- @staticmethod
- def __get_frame_range(automan_info, project_id, annotation_id):
- path = '/projects/' + str(project_id) + '/annotations/' + str(annotation_id) + '/'
- res = AutomanClient.send_get(automan_info, path).json()
- dataset_path = '/projects/' + str(project_id) + '/datasets/' + str(res['dataset_id']) + '/'
- dataset = AutomanClient.send_get(automan_info, dataset_path).json()
- return dataset['frame_count']
- @staticmethod
- def __get_candidates(automan_info, project_id, original_id):
- path = '/projects/' + str(project_id) + '/originals/' + str(original_id) + '/candidates/'
- res = AutomanClient.send_get(automan_info, path).json()
- candidates = []
- for record in res['records']:
- ext = '.jpg' if record['data_type'] == 'IMAGE' else '.pcd'
- candidates.append({'id': record['candidate_id'], 'ext': ext})
- return candidates
- @staticmethod
- def __get_annotation(automan_info, project_id, annotation_id, frame, annotations_dir):
- path = '/projects/' + str(project_id) + '/annotations/' + str(annotation_id) \
- + '/frames/' + str(frame) + '/objects/'
- res = AutomanClient.send_get(automan_info, path).json()
- # TODO format to "kitti format"
- # ensure directory
- os.makedirs(annotations_dir, exist_ok=True)
- with open(os.path.join( annotations_dir, str(frame).zfill(6) + '.json'), mode='w') as frame:
- frame.write(json.dumps(res))
- return res
- @staticmethod
- def __get_annotation_image(automan_info, project_id, dataset_id, candidate_id, frame, ext, images_dir):
- path = '/projects/' + str(project_id) + '/datasets/' + str(dataset_id) \
- + '/candidates/' + str(candidate_id) + '/frames/' + str(frame) + '/'
- img_url = AutomanClient.send_get(automan_info, path).text
- if re.search(automan_info['host'], img_url):
- headers = {
- 'Authorization': 'JWT ' + automan_info['jwt'],
- }
- else:
- headers = {}
- res = requests.get(img_url, headers=headers)
- if 200 > res.status_code >= 300:
- print(f'get annotation image status_code = {res.status_code}. body = {res.text}')
- return None
- # write images
- os.makedirs(images_dir, exist_ok=True)
- file_name = str(candidate_id) + '_' + str(frame).zfill(6) + ext
- img_path = os.path.join(images_dir, file_name)
- with open(img_path, mode='wb') as frame:
- frame.write(res.content)
- if ext == '.jpg':
- return file_name
- return None
- @staticmethod
- def __draw_annotation(file_name, annotation, colors, images_dir, image_annotations_dir):
- import cv2
- if annotation['count'] == 0:
- return 0
- os.makedirs(image_annotations_dir, exist_ok=True)
- img = cv2.imread(os.path.join(images_dir, file_name))
- for a in annotation['records']:
- for c in a['content']:
- if 'min_x_2d' not in a['content'][c]:
- continue
- bbox = ((a['content'][c]['min_x_2d'], a['content'][c]['min_y_2d']),
- (a['content'][c]['max_x_2d'], a['content'][c]['max_y_2d']))
- cv_color = get_cv_color(colors, a['name'])
- cv2.rectangle(img, bbox[0], bbox[1], cv_color, 2)
- cv2.imwrite(os.path.join(image_annotations_dir, file_name), img)
- @staticmethod
- def __get_annotation_color(automan_info, project_id):
- path = '/projects/' + str(project_id) + '/'
- res = AutomanClient.send_get(automan_info, path).json()
- colors = {}
- for record in res['klassset']['records']:
- config = json.loads(record['config'])
- colors[record['name']] = config['color']
- return colors
- def main(automan_info, archive_info, storage_type, storage_info):
- automan_info = json.loads(automan_info)
- archive_info = json.loads(archive_info)
- archive_dir = archive_info['archive_dir'].rstrip('/') + '/'
- storage_client = StorageClientFactory.create(
- storage_type,
- json.loads(storage_info),
- archive_info
- )
- AutomanArchiver.archive(automan_info, archive_info)
- shutil.make_archive(
- archive_dir + archive_info['archive_name'],
- 'gztar',
- root_dir=TEMP_DIR)
- if storage_type == 'AWS_S3':
- storage_client.upload(automan_info, archive_dir)
- # TODO post : ArchiviedLabelDataset
- data = {
- 'file_path': archive_info['archive_dir'],
- 'file_name': archive_info['archive_name'] + '.tar.gz',
- 'annotation_id': archive_info['annotation_id'],
- }
- AutomanClient.send_result(automan_info, data)
- if __name__ == '__main__':
- parser = argparse.ArgumentParser()
- parser.add_argument('--storage_type', required=False)
- parser.add_argument('--storage_info', required=False)
- parser.add_argument('--automan_info', required=True)
- parser.add_argument('--archive_info', required=True)
- args = parser.parse_args()
- main(args.automan_info, args.archive_info, args.storage_type, args.storage_info)
|