wake-up-neo.com

Kopieren von Daten aus dem Azure Blob-Speicher in AWS S3

Ich bin neu in Azure Data Factory und habe eine interessante Anforderung.

Ich muss Dateien aus dem Azure Blob-Speicher in Amazon S3 verschieben, idealerweise mithilfe von Azure Data Factory.

S3 wird jedoch nicht als Senke unterstützt.

 enter image description here

https://docs.Microsoft.com/en-us/Azure/data-factory/copy-activity-overview

Ich verstehe auch aus einer Reihe von Kommentaren, die ich hier gelesen habe, dass Sie nicht direkt von Blob Storage auf S3 kopieren können - Sie müssten die Datei lokal herunterladen und dann auf S3 hochladen.

Kennt jemand Beispiele in Data Factory, SSIS oder Azure Runbook, die eine solche Funktion ausführen können? Ich nehme an, dass eine Option darin besteht, eine Azure-Logik-App oder -Funktion zu schreiben, die von Data Factory aufgerufen wird.

3
James Cooke

Hat es geschafft, etwas zum Laufen zu bringen - es könnte für jemand anderen nützlich sein.

Ich habe beschlossen, eine Azure-Funktion zu schreiben, die eine HTTP-Anforderung als Auslöser verwendet.

Diese beiden Beiträge haben mir sehr geholfen;

Wie kann ich NuGet-Pakete in meinen Azure-Funktionen verwenden?

Kopieren von Azure Blob nach AWS S3 mit C #

Beachten Sie meine Antwort auf die Nuget-Pakete, wenn Sie Azure-Funktionen 2.x verwenden.

Hier ist der Code - Sie können die Basis dafür an Ihre Bedürfnisse anpassen. Ich gebe ein JSON Serialized-Objekt zurück, da Azure Data Factory dies als Antwort auf eine von einer Pipeline gesendete http-Anfrage benötigt.

#r "Microsoft.WindowsAzure.Storage"
#r "Newtonsoft.Json"
#r "System.Net.Http"

using System.Net;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Primitives;
using Newtonsoft.Json;
using Microsoft.WindowsAzure.Storage.Blob;
using System.Net.Http;
using Amazon.S3; 
using Amazon.S3.Model;
using Amazon.S3.Transfer;
using Amazon.S3.Util;


public static async  Task<IActionResult> Run(HttpRequest req, ILogger log)
{
    log.LogInformation("Example Function has recieved a HTTP Request");

    // get Params from query string
    string blobUri = req.Query["blobUri"];
    string bucketName = req.Query["bucketName"];

    // Validate query string
    if (String.IsNullOrEmpty(blobUri) || String.IsNullOrEmpty(bucketName)) {

        Result outcome = new Result("Invalid Parameters Passed to Function",false,"blobUri or bucketName is null or empty");
        return new BadRequestObjectResult(outcome.ConvertResultToJson());
    }

    // cast the blob to its type
    Uri blobAbsoluteUri = new Uri(blobUri);
    CloudBlockBlob blob = new CloudBlockBlob(blobAbsoluteUri);

    // Do the Copy
    bool resultBool = await CopyBlob(blob, bucketName, log);

    if (resultBool) { 
        Result outcome = new Result("Copy Completed",true,"Blob: " + blobUri + " Copied to Bucket: " + bucketName);
        return (ActionResult)new OkObjectResult(outcome.ConvertResultToJson());       
    }
    else {
        Result outcome = new Result("ERROR",false,"Copy was not successful Please review Application Logs");
        return new BadRequestObjectResult(outcome.ConvertResultToJson()); 
    }  
}

static async Task<bool> CopyBlob(CloudBlockBlob blob, string existingBucket, ILogger log) {

        var accessKey = "myAwsKey";
        var secretKey = "myAwsSecret";
        var keyName = blob.Name;

        // Make the client 
        AmazonS3Client myClient = new AmazonS3Client(accessKey, secretKey, Amazon.RegionEndpoint.EUWest1);

        // Check the Target Bucket Exists; 
        bool bucketExists = await AmazonS3Util.DoesS3BucketExistAsync (myClient,existingBucket);

        if (!bucketExists) {
            log.LogInformation("Bucket: " + existingBucket + " does not exist or is inaccessible to the application");
            return false;
        }

        // Set up the Transfer Utility
        TransferUtility fileTransferUtility = new TransferUtility(myClient);

        // Stream the file
        try {

            log.LogInformation("Starting Copy");

            using (var stream = await blob.OpenReadAsync()) {

                // Note: You need permissions to not be private on the source blob
                log.LogInformation("Streaming");

                await fileTransferUtility.UploadAsync(stream,existingBucket,keyName);

                log.LogInformation("Streaming Done");   
            }

            log.LogInformation("Copy completed");
        }
        catch (AmazonS3Exception e) {
                log.LogInformation("Error encountered on server. Message:'{0}' when writing an object", e.Message);
            }
        catch (Exception e) {
                log.LogInformation("Unknown encountered on server. Message:'{0}' when writing an object", e.Message);
                return false;
        }

        return true; 
    }

public class Result {

    public string result;
    public bool outcome;
    public string UTCtime;
    public string details; 

    public Result(string msg, bool outcomeBool, string fullMsg){
        result=msg;
        UTCtime=DateTime.Now.ToString("yyyy-MM-dd h:mm:ss tt");
        outcome=outcomeBool;
        details=fullMsg;
    }

    public string ConvertResultToJson() {
        return JsonConvert.SerializeObject(this);
    } 
}
1
James Cooke