123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- from random_data import RandomData
- import tempfile
- import os
- from azure.storage.fileshare import ShareServiceClient
- class FileBasicSamples():
- def __init__(self):
- self.random_data = RandomData()
-
- def run_all_samples(self, connection_string):
- print('Azure Storage File Basis samples - Starting.')
-
-
- filename = 'filesample' + self.random_data.get_random_name(6)
- sharename = 'sharesample' + self.random_data.get_random_name(6)
-
- try:
-
- service = ShareServiceClient.from_connection_string(conn_str=connection_string)
- print('\n\n* Basic file operations *\n')
- self.basic_file_operations(sharename, filename, service)
- except Exception as e:
- print('error:' + e)
- finally:
-
- self.file_delete_samples(sharename, filename, service)
- print('\nAzure Storage File Basic samples - Completed.\n')
-
- def basic_file_operations(self, sharename, filename, service):
-
- print('\nAttempting to create a sample file from text for upload demonstration.')
-
-
- print('Creating sample share.')
- share_client = service.create_share(share_name=sharename)
- print('Sample share "'+ sharename +'" created.')
-
- print('Creating a sample directory.')
-
- directory_client = share_client.create_directory("mydirectory")
- print('Sample directory "mydirectory" created.')
-
-
- print('Uploading a sample file from text.')
-
- file_client = directory_client.get_file_client(filename)
-
- file_client.upload_file('Hello World! - from text sample')
- print('Sample file "' + filename + '" created and uploaded to: ' + sharename + '/mydirectory')
-
-
- print('\nCopying file ' + filename)
-
- destination_file_client = share_client.get_file_client('file1copy')
-
- copy_resp = destination_file_client.start_copy_from_url(source_url=file_client.url)
- if copy_resp['copy_status'] == 'pending':
-
- print('Abort copy operation')
- destination_file.abort_copy()
- else:
- print('Copy was a ' + copy_resp['copy_status'])
-
-
- print('\nAttempting to upload a sample file from path for upload demonstration.')
-
- print('Creating a temporary file from text.')
- with tempfile.NamedTemporaryFile(delete=False) as my_temp_file:
- my_temp_file.file.write(b"Hello world!")
- print('Sample temporary file created.')
-
-
- print('Uploading a sample file from local path.')
-
- file_client = share_client.get_file_client(filename)
-
- with open(my_temp_file.name, "rb") as source_file:
- file_client.upload_file(source_file)
- print('Sample file "' + filename + '" uploaded from path to share: ' + sharename)
-
- my_temp_file.close()
-
- print('\nGet list of valid ranges of the file.')
- file_ranges = file_client.get_ranges()
- data = b'abcdefghijkl'
- print('Put a range of data to the file.')
-
- file_client.upload_range(data=data, offset=file_ranges[0]['start'], length=len(data))
-
-
- print('\nAttempting to download a sample file from Azure files for demonstration.')
- destination_file = tempfile.tempdir + '\mypathfile.txt'
- with open(destination_file, "wb") as file_handle:
- data = file_client.download_file()
- data.readinto(file_handle)
- print('Sample file downloaded to: ' + destination_file)
-
- print('\nAttempting to list files and directories directory under share "' + sharename + '":')
-
-
- generator = share_client.list_directories_and_files()
-
- for file_or_dir in generator:
- print(file_or_dir['name'])
-
-
- os.remove(my_temp_file.name)
- print('Files and directories under share "' + sharename + '" listed.')
- print('\nCompleted successfully - Azure basic Files operations.')
-
-
- def file_delete_samples(self, sharename, filename, service):
- print('\nDeleting all samples created for this demonstration.')
- try:
-
-
- print('Deleting a sample file.')
- share_client = service.get_share_client(sharename)
- directory_client = share_client.get_directory_client('mydirectory')
-
- directory_client.delete_file(file_name=filename)
- print('Sample file "' + filename + '" deleted from: ' + sharename + '/mydirectory' )
-
- print('Deleting sample directory and all files and directories under it.')
- share_client.delete_directory('mydirectory')
- print('Sample directory "/mydirectory" deleted from: ' + sharename)
-
- print('Deleting sample share ' + sharename + ' and all files and directories under it.')
- share_client.delete_share(sharename)
- print('Sample share "' + sharename + '" deleted.')
- print('\nCompleted successfully - Azure Files samples deleted.')
- except Exception as e:
- print('********ErrorDelete***********')
- print(e)
|