/*
* Rufs - Remote Union File System
*
* Copyright 2017 Matthias Ladkau. All rights reserved.
*
* This Source Code Form is subject to the terms of the MIT
* License, If a copy of the MIT License was not distributed with this
* file, You can obtain one at https://opensource.org/licenses/MIT.
*/
/*
Package api contains the REST API for RUFS.
/about
Endpoint which returns an object with version information.
{
api_versions : List of available API versions e.g. [ "v1" ]
product : Name of the API provider (RUFS)
version: : Version of the API provider
}
*/
package api
import (
"encoding/json"
"net/http"
"devt.de/krotik/rufs/config"
)
/*
EndpointAbout is the about endpoint definition (rooted). Handles about/
*/
const EndpointAbout = APIRoot + "/about/"
/*
AboutEndpointInst creates a new endpoint handler.
*/
func AboutEndpointInst() RestEndpointHandler {
return &aboutEndpoint{}
}
/*
aboutEndpoint is the handler object for about operations.
*/
type aboutEndpoint struct {
*DefaultEndpointHandler
}
/*
HandleGET returns about data for the REST API.
*/
func (a *aboutEndpoint) HandleGET(w http.ResponseWriter, r *http.Request, resources []string) {
data := map[string]interface{}{
"product": "RUFS",
"version": config.ProductVersion,
}
// Write data
w.Header().Set("content-type", "application/json; charset=utf-8")
ret := json.NewEncoder(w)
ret.Encode(data)
}
/*
* Rufs - Remote Union File System
*
* Copyright 2017 Matthias Ladkau. All rights reserved.
*
* This Source Code Form is subject to the terms of the MIT
* License, If a copy of the MIT License was not distributed with this
* file, You can obtain one at https://opensource.org/licenses/MIT.
*/
package api
import (
"crypto/tls"
"fmt"
"net/http"
"strings"
"sync"
"devt.de/krotik/rufs"
)
/*
APIVersion is the version of the REST API
*/
const APIVersion = "1.0.0"
/*
APIRoot is the API root directory for the REST API
*/
const APIRoot = "/fs"
/*
APISchemes defines the supported schemes by the REST API
*/
var APISchemes = []string{"https"}
/*
APIHost is the host definition for the REST API
*/
var APIHost = "localhost:9040"
/*
RestEndpointInst models a factory function for REST endpoint handlers.
*/
type RestEndpointInst func() RestEndpointHandler
/*
GeneralEndpointMap is a map of urls to general REST endpoints
*/
var GeneralEndpointMap = map[string]RestEndpointInst{
EndpointAbout: AboutEndpointInst,
EndpointSwagger: SwaggerEndpointInst,
}
/*
RestEndpointHandler models a REST endpoint handler.
*/
type RestEndpointHandler interface {
/*
HandleGET handles a GET request.
*/
HandleGET(w http.ResponseWriter, r *http.Request, resources []string)
/*
HandlePOST handles a POST request.
*/
HandlePOST(w http.ResponseWriter, r *http.Request, resources []string)
/*
HandlePUT handles a PUT request.
*/
HandlePUT(w http.ResponseWriter, r *http.Request, resources []string)
/*
HandleDELETE handles a DELETE request.
*/
HandleDELETE(w http.ResponseWriter, r *http.Request, resources []string)
/*
SwaggerDefs is used to describe the endpoint in swagger.
*/
SwaggerDefs(s map[string]interface{})
}
/*
trees is a map of all trees which can be used by the REST API
*/
var trees = make(map[string]*rufs.Tree)
var treesLock = sync.Mutex{}
/*
ResetTrees removes all registered trees.
*/
var ResetTrees = func() {
treesLock.Lock()
defer treesLock.Unlock()
trees = make(map[string]*rufs.Tree)
}
/*
Trees is a getter function which returns a map of all registered trees.
This function can be overwritten by client code to implement access
control.
*/
var Trees = func() (map[string]*rufs.Tree, error) {
treesLock.Lock()
defer treesLock.Unlock()
ret := make(map[string]*rufs.Tree)
for k, v := range trees {
ret[k] = v
}
return ret, nil
}
/*
GetTree returns a specific tree. This function can be overwritten by
client code to implement access control.
*/
var GetTree = func(id string) (*rufs.Tree, bool, error) {
treesLock.Lock()
defer treesLock.Unlock()
tree, ok := trees[id]
return tree, ok, nil
}
/*
AddTree adds a new tree. This function can be overwritten by
client code to implement access control.
*/
var AddTree = func(id string, tree *rufs.Tree) error {
treesLock.Lock()
defer treesLock.Unlock()
if _, ok := trees[id]; ok {
return fmt.Errorf("Tree %v already exists", id)
}
trees[id] = tree
return nil
}
/*
RemoveTree removes a tree. This function can be overwritten by
client code to implement access control.
*/
var RemoveTree = func(id string) error {
treesLock.Lock()
defer treesLock.Unlock()
if _, ok := trees[id]; !ok {
return fmt.Errorf("Tree %v does not exist", id)
}
delete(trees, id)
return nil
}
/*
TreeConfigTemplate is the configuration which is used for newly created trees.
*/
var TreeConfigTemplate map[string]interface{}
/*
TreeCertTemplate is the certificate which is used for newly created trees.
*/
var TreeCertTemplate *tls.Certificate
/*
Map of all registered endpoint handlers.
*/
var registered = map[string]RestEndpointInst{}
/*
HandleFunc to use for registering handlers
*/
var HandleFunc = http.HandleFunc
/*
RegisterRestEndpoints registers all given REST endpoint handlers.
*/
func RegisterRestEndpoints(endpointInsts map[string]RestEndpointInst) {
for url, endpointInst := range endpointInsts {
registered[url] = endpointInst
HandleFunc(url, func() func(w http.ResponseWriter, r *http.Request) {
var handlerURL = url
var handlerInst = endpointInst
return func(w http.ResponseWriter, r *http.Request) {
// Create a new handler instance
handler := handlerInst()
// Handle request in appropriate method
res := strings.TrimSpace(r.URL.Path[len(handlerURL):])
if len(res) > 0 && res[len(res)-1] == '/' {
res = res[:len(res)-1]
}
var resources []string
if res != "" {
resources = strings.Split(res, "/")
}
switch r.Method {
case "GET":
handler.HandleGET(w, r, resources)
case "POST":
handler.HandlePOST(w, r, resources)
case "PUT":
handler.HandlePUT(w, r, resources)
case "DELETE":
handler.HandleDELETE(w, r, resources)
default:
http.Error(w, http.StatusText(http.StatusMethodNotAllowed),
http.StatusMethodNotAllowed)
}
}
}())
}
}
/*
DefaultEndpointHandler is the default endpoint handler implementation.
*/
type DefaultEndpointHandler struct {
}
/*
HandleGET handles a GET request.
*/
func (de *DefaultEndpointHandler) HandleGET(w http.ResponseWriter, r *http.Request, resources []string) {
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
}
/*
HandlePOST handles a POST request.
*/
func (de *DefaultEndpointHandler) HandlePOST(w http.ResponseWriter, r *http.Request, resources []string) {
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
}
/*
HandlePUT handles a PUT request.
*/
func (de *DefaultEndpointHandler) HandlePUT(w http.ResponseWriter, r *http.Request, resources []string) {
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
}
/*
HandleDELETE handles a DELETE request.
*/
func (de *DefaultEndpointHandler) HandleDELETE(w http.ResponseWriter, r *http.Request, resources []string) {
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
}
/*
* Rufs - Remote Union File System
*
* Copyright 2017 Matthias Ladkau. All rights reserved.
*
* This Source Code Form is subject to the terms of the MIT
* License, If a copy of the MIT License was not distributed with this
* file, You can obtain one at https://opensource.org/licenses/MIT.
*/
package api
import (
"encoding/json"
"net/http"
)
/*
SwaggerDefs is used to describe the endpoint in swagger.
*/
func (a *aboutEndpoint) SwaggerDefs(s map[string]interface{}) {
// Add query paths
s["paths"].(map[string]interface{})["/about"] = map[string]interface{}{
"get": map[string]interface{}{
"summary": "Return information about the REST API provider.",
"description": "Returns available API versions, product name and product version.",
"produces": []string{
"application/json",
},
"responses": map[string]interface{}{
"200": map[string]interface{}{
"description": "About info object",
"schema": map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"api_versions": map[string]interface{}{
"description": "List of available API versions.",
"type": "array",
"items": map[string]interface{}{
"description": "Available API version.",
"type": "string",
},
},
"product": map[string]interface{}{
"description": "Product name of the REST API provider.",
"type": "string",
},
"version": map[string]interface{}{
"description": "Version of the REST API provider.",
"type": "string",
},
},
},
},
"default": map[string]interface{}{
"description": "Error response",
"schema": map[string]interface{}{
"$ref": "#/definitions/Error",
},
},
},
},
}
// Add generic error object to definition
s["definitions"].(map[string]interface{})["Error"] = map[string]interface{}{
"description": "A human readable error mesage.",
"type": "string",
}
}
/*
EndpointSwagger is the swagger endpoint URL (rooted). Handles swagger.json/
*/
const EndpointSwagger = APIRoot + "/swagger.json/"
/*
SwaggerEndpointInst creates a new endpoint handler.
*/
func SwaggerEndpointInst() RestEndpointHandler {
return &swaggerEndpoint{}
}
/*
Handler object for swagger operations.
*/
type swaggerEndpoint struct {
*DefaultEndpointHandler
}
/*
HandleGET returns the swagger definition of the REST API.
*/
func (a *swaggerEndpoint) HandleGET(w http.ResponseWriter, r *http.Request, resources []string) {
// Add general sections
data := map[string]interface{}{
"swagger": "2.0",
"host": APIHost,
"schemes": APISchemes,
"basePath": APIRoot,
"produces": []string{"application/json"},
"paths": map[string]interface{}{},
"definitions": map[string]interface{}{},
}
// Go through all registered components and let them add their definitions
a.SwaggerDefs(data)
for _, inst := range registered {
inst().SwaggerDefs(data)
}
// Write data
w.Header().Set("content-type", "application/json; charset=utf-8")
ret := json.NewEncoder(w)
ret.Encode(data)
}
/*
SwaggerDefs is used to describe the endpoint in swagger.
*/
func (a *swaggerEndpoint) SwaggerDefs(s map[string]interface{}) {
// Add general application information
s["info"] = map[string]interface{}{
"title": "RUFS API",
"description": "Query and control the Remote Union File System.",
"version": APIVersion,
}
}
/*
* Rufs - Remote Union File System
*
* Copyright 2017 Matthias Ladkau. All rights reserved.
*
* This Source Code Form is subject to the terms of the MIT
* License, If a copy of the MIT License was not distributed with this
* file, You can obtain one at https://opensource.org/licenses/MIT.
*/
/*
Package v1 contains Rufs REST API Version 1.
Admin control endpoint
/admin
The admin endpoint can be used for various admin tasks such as registering
new branches or mounting known branches.
A GET request to the admin endpoint returns the current tree
configuration; an object of all known branches and the current mapping:
{
branches : [ <known branches> ],
tree : [ <current mapping> ]
}
A POST request to the admin endpoint creates a new tree. The body of
the request should have the following form:
"<name>"
/admin/<tree>
A DELETE request to a particular tree will delete the tree.
/admin/<tree>/branch
A new branch can be created in an existing tree by sending a POST request
to the branch endpoint. The body of the request should have the following
form:
{
branch : <Name of the branch>,
rpc : <RPC definition of the remote branch (e.g. localhost:9020)>,
fingerprint : <Expected SSL fingerprint of the remote branch or an empty string>
}
/admin/<tree>/mapping
A new mapping can be created in an existing tree by sending a POST request
to the mapping endpoint. The body of the request should have the following
form:
{
branch : <Name of the branch>,
dir : <Tree directory of the branch root>,
writable : <Flag if the branch should handle write operations>
}
Dir listing endpoing
/dir/<tree>/<path>
The dir endpoing handles requests for the directory listing of a certain
path. A request url should be of the following form:
/dir/<tree>/<path>?recursive=<flag>&checksums=<flag>
The request can optionally include the flag parameters (value should
be 1 or 0) recursive and checksums. The recursive flag will add all
subdirectories to the listing and the checksums flag will add
checksums for all listed files.
File queries and manipulation
/file/{tree}/{path}
A GET request to a specific file will return its contents. A POST will
upload a new or overwrite an existing file. A DELETE request will delete
an existing file.
New files are expected to be uploaded using a multipart/form-data request.
When uploading a new file the form field for the file should be named
"uploadfile". The form can optionally contain a redirect field which
will issue a redirect once the file has been uploaded.
A PUT request is used to perform a file operation. The request body
should be a JSON object of the form (parameters are operation specific):
{
action : <Action to perform>,
files : <List of (full path) files which should be copied / renamed>
newname : <New name of file (when renaming)>,
newnames : <List of new file names when renaming multiple files using
the files parameter>,
destination : <Destination file when copying a single file - Destination
directory when copying multiple files using the files
parameter or syncing directories>
}
The action can either be: sync, rename, mkdir or copy. Copy and sync returns a JSON
structure containing a progress id:
{
progress_id : <Id for progress of the copy operation>
}
Progress information
/progress/<progress id>
A GET request to the progress endpoint returns the current progress of
an ongoing operation. The result should be:
{
"item": <Currently processing item>,
"operation": <Name of operation>,
"progress": <Current progress>,
"subject": <Name of the subject on which the operation is performed>,
"total_items": <Total number of items>,
"total_progress": <Total progress>
}
Create zip files
/zip/<tree>
A post to the zip enpoint returns a zip file containing requested files. The
files to include must be given as a list of file name with full path in the body.
The body should be application/x-www-form-urlencoded encoded. The list should
be a JSON encoded string as value of the value files. The body should have the
following form:
files=[ "<file1>", "<file2>" ]
*/
package v1
import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"devt.de/krotik/rufs"
"devt.de/krotik/rufs/api"
)
/*
EndpointAdmin is the mount endpoint URL (rooted). Handles everything
under admin/...
*/
const EndpointAdmin = api.APIRoot + APIv1 + "/admin/"
/*
AdminEndpointInst creates a new endpoint handler.
*/
func AdminEndpointInst() api.RestEndpointHandler {
return &adminEndpoint{}
}
/*
Handler object for admin operations.
*/
type adminEndpoint struct {
*api.DefaultEndpointHandler
}
/*
HandleGET handles an admin query REST call.
*/
func (a *adminEndpoint) HandleGET(w http.ResponseWriter, r *http.Request, resources []string) {
data := make(map[string]interface{})
trees, err := api.Trees()
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
refreshName := r.URL.Query().Get("refresh")
for k, v := range trees {
var tree map[string]interface{}
if refreshName != "" && k == refreshName {
v.Refresh()
}
json.Unmarshal([]byte(v.Config()), &tree)
data[k] = tree
}
// Write data
w.Header().Set("content-type", "application/json; charset=utf-8")
json.NewEncoder(w).Encode(data)
}
/*
HandlePOST handles REST calls to create a new tree.
*/
func (a *adminEndpoint) HandlePOST(w http.ResponseWriter, r *http.Request, resources []string) {
var tree *rufs.Tree
var ok bool
var err error
var data map[string]interface{}
if len(resources) == 0 {
var name string
if err := json.NewDecoder(r.Body).Decode(&name); err != nil {
http.Error(w, fmt.Sprintf("Could not decode request body: %v", err.Error()),
http.StatusBadRequest)
return
} else if name == "" {
http.Error(w, fmt.Sprintf("Body must contain the tree name as a non-empty JSON string"),
http.StatusBadRequest)
return
}
// Create a new tree
tree, err := rufs.NewTree(api.TreeConfigTemplate, api.TreeCertTemplate)
if err != nil {
http.Error(w, fmt.Sprintf("Could not create new tree: %v", err.Error()),
http.StatusBadRequest)
return
}
// Store the new tree
if err := api.AddTree(name, tree); err != nil {
http.Error(w, fmt.Sprintf("Could not add new tree: %v", err.Error()),
http.StatusBadRequest)
}
return
}
if !checkResources(w, resources, 2, 2, "Need a tree name and a section (either branches or mapping)") {
return
}
if tree, ok, err = api.GetTree(resources[0]); err == nil && !ok {
err = fmt.Errorf("Unknown tree: %v", resources[0])
}
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := json.NewDecoder(r.Body).Decode(&data); err != nil {
http.Error(w, fmt.Sprintf("Could not decode request body: %v", err.Error()),
http.StatusBadRequest)
return
}
if resources[1] == "branch" {
// Add a new branch
if rpc, ok := getMapValue(w, data, "rpc"); ok {
if branch, ok := getMapValue(w, data, "branch"); ok {
if fingerprint, ok := getMapValue(w, data, "fingerprint"); ok {
if err := tree.AddBranch(branch, rpc, fingerprint); err != nil {
http.Error(w, fmt.Sprintf("Could not add branch: %v", err.Error()),
http.StatusBadRequest)
}
}
}
}
} else if resources[1] == "mapping" {
// Add a new mapping
if _, ok := data["dir"]; ok {
if dir, ok := getMapValue(w, data, "dir"); ok {
if branch, ok := getMapValue(w, data, "branch"); ok {
if writeableStr, ok := getMapValue(w, data, "writeable"); ok {
writeable, err := strconv.ParseBool(writeableStr)
if err != nil {
http.Error(w, fmt.Sprintf("Writeable value must be a boolean: %v", err.Error()),
http.StatusBadRequest)
} else if err := tree.AddMapping(dir, branch, writeable); err != nil {
http.Error(w, fmt.Sprintf("Could not add branch: %v", err.Error()),
http.StatusBadRequest)
}
}
}
}
}
}
}
/*
HandleDELETE handles REST calls to delete an existing tree.
*/
func (a *adminEndpoint) HandleDELETE(w http.ResponseWriter, r *http.Request, resources []string) {
if !checkResources(w, resources, 1, 1, "Need a tree name") {
return
}
// Delete the tree
if err := api.RemoveTree(resources[0]); err != nil {
http.Error(w, fmt.Sprintf("Could not remove tree: %v", err.Error()),
http.StatusBadRequest)
}
}
/*
SwaggerDefs is used to describe the endpoint in swagger.
*/
func (a *adminEndpoint) SwaggerDefs(s map[string]interface{}) {
s["paths"].(map[string]interface{})["/v1/admin"] = map[string]interface{}{
"get": map[string]interface{}{
"summary": "Return all current tree configurations.",
"description": "All current tree configurations; each object has a list of all known branches and the current mapping.",
"produces": []string{
"text/plain",
"application/json",
},
"parameters": []map[string]interface{}{
{
"name": "refresh",
"in": "query",
"description": "Refresh a particular tree (reload branches and mappings).",
"required": false,
"type": "string",
},
},
"responses": map[string]interface{}{
"200": map[string]interface{}{
"description": "A key-value map of tree name to tree configuration",
},
"default": map[string]interface{}{
"description": "Error response",
"schema": map[string]interface{}{
"$ref": "#/definitions/Error",
},
},
},
},
"post": map[string]interface{}{
"summary": "Create a new tree.",
"description": "Create a new named tree.",
"consumes": []string{
"application/json",
},
"produces": []string{
"text/plain",
},
"parameters": []map[string]interface{}{
{
"name": "data",
"in": "body",
"description": "Name of the new tree.",
"required": true,
"schema": map[string]interface{}{
"type": "string",
},
},
},
"responses": map[string]interface{}{
"200": map[string]interface{}{
"description": "Returns an empty body if successful.",
},
"default": map[string]interface{}{
"description": "Error response",
"schema": map[string]interface{}{
"$ref": "#/definitions/Error",
},
},
},
},
}
s["paths"].(map[string]interface{})["/v1/admin/{tree}"] = map[string]interface{}{
"delete": map[string]interface{}{
"summary": "Delete a tree.",
"description": "Delete a named tree.",
"produces": []string{
"text/plain",
},
"parameters": []map[string]interface{}{
{
"name": "tree",
"in": "path",
"description": "Name of the tree.",
"required": true,
"type": "string",
},
},
"responses": map[string]interface{}{
"200": map[string]interface{}{
"description": "Returns an empty body if successful.",
},
"default": map[string]interface{}{
"description": "Error response",
"schema": map[string]interface{}{
"$ref": "#/definitions/Error",
},
},
},
},
}
s["paths"].(map[string]interface{})["/v1/admin/{tree}/branch"] = map[string]interface{}{
"post": map[string]interface{}{
"summary": "Add a new branch.",
"description": "Add a new remote branch to the tree.",
"consumes": []string{
"application/json",
},
"produces": []string{
"text/plain",
},
"parameters": []map[string]interface{}{
{
"name": "tree",
"in": "path",
"description": "Name of the tree.",
"required": true,
"type": "string",
},
{
"name": "data",
"in": "body",
"description": "Definition of the new branch.",
"required": true,
"schema": map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"branch": map[string]interface{}{
"description": "Name of the remote branch (must match on the remote branch).",
"type": "string",
},
"rpc": map[string]interface{}{
"description": "RPC definition of the remote branch (e.g. localhost:9020).",
"type": "string",
},
"fingerprint": map[string]interface{}{
"description": "Expected SSL fingerprint of the remote branch (shown during startup) or an empty string.",
"type": "string",
},
},
},
},
},
"responses": map[string]interface{}{
"200": map[string]interface{}{
"description": "Returns an empty body if successful.",
},
"default": map[string]interface{}{
"description": "Error response",
"schema": map[string]interface{}{
"$ref": "#/definitions/Error",
},
},
},
},
}
s["paths"].(map[string]interface{})["/v1/admin/{tree}/mapping"] = map[string]interface{}{
"post": map[string]interface{}{
"summary": "Add a new mapping.",
"description": "Add a new mapping to the tree.",
"consumes": []string{
"application/json",
},
"produces": []string{
"text/plain",
},
"parameters": []map[string]interface{}{
{
"name": "tree",
"in": "path",
"description": "Name of the tree.",
"required": true,
"type": "string",
},
{
"name": "data",
"in": "body",
"description": "Definition of the new branch.",
"required": true,
"schema": map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"branch": map[string]interface{}{
"description": "Name of the known remote branch.",
"type": "string",
},
"dir": map[string]interface{}{
"description": "Tree directory which should hold the branch root.",
"type": "string",
},
"writable": map[string]interface{}{
"description": "Flag if the branch should be mapped as writable.",
"type": "string",
},
},
},
},
},
"responses": map[string]interface{}{
"200": map[string]interface{}{
"description": "Returns an empty body if successful.",
},
"default": map[string]interface{}{
"description": "Error response",
"schema": map[string]interface{}{
"$ref": "#/definitions/Error",
},
},
},
},
}
// Add generic error object to definition
s["definitions"].(map[string]interface{})["Error"] = map[string]interface{}{
"description": "A human readable error mesage.",
"type": "string",
}
}
/*
* Rufs - Remote Union File System
*
* Copyright 2017 Matthias Ladkau. All rights reserved.
*
* This Source Code Form is subject to the terms of the MIT
* License, If a copy of the MIT License was not distributed with this
* file, You can obtain one at https://opensource.org/licenses/MIT.
*/
package v1
import (
"encoding/json"
"fmt"
"net/http"
"os"
"path"
"strconv"
"devt.de/krotik/common/stringutil"
"devt.de/krotik/rufs"
"devt.de/krotik/rufs/api"
)
/*
EndpointDir is the dir endpoint URL (rooted). Handles everything
under dir/...
*/
const EndpointDir = api.APIRoot + APIv1 + "/dir/"
/*
DirEndpointInst creates a new endpoint handler.
*/
func DirEndpointInst() api.RestEndpointHandler {
return &dirEndpoint{}
}
/*
Handler object for dir operations.
*/
type dirEndpoint struct {
*api.DefaultEndpointHandler
}
/*
HandleGET handles a dir query REST call.
*/
func (d *dirEndpoint) HandleGET(w http.ResponseWriter, r *http.Request, resources []string) {
var tree *rufs.Tree
var ok, checksums bool
var err error
var dirs []string
var fis [][]os.FileInfo
if len(resources) == 0 {
http.Error(w, "Need at least a tree name",
http.StatusBadRequest)
return
}
if tree, ok, err = api.GetTree(resources[0]); err == nil && !ok {
err = fmt.Errorf("Unknown tree: %v", resources[0])
}
if err == nil {
var rex string
glob := r.URL.Query().Get("glob")
recursive, _ := strconv.ParseBool(r.URL.Query().Get("recursive"))
checksums, _ = strconv.ParseBool(r.URL.Query().Get("checksums"))
if rex, err = stringutil.GlobToRegex(glob); err == nil {
dirs, fis, err = tree.Dir(path.Join(resources[1:]...), rex, recursive, checksums)
}
}
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
data := make(map[string]interface{})
for i, d := range dirs {
var flist []map[string]interface{}
fi := fis[i]
for _, f := range fi {
toAdd := map[string]interface{}{
"name": f.Name(),
"size": f.Size(),
"isdir": f.IsDir(),
}
if checksums {
toAdd["checksum"] = f.(*rufs.FileInfo).Checksum()
}
flist = append(flist, toAdd)
}
data[d] = flist
}
// Write data
w.Header().Set("content-type", "application/json; charset=utf-8")
json.NewEncoder(w).Encode(data)
}
/*
SwaggerDefs is used to describe the endpoint in swagger.
*/
func (d *dirEndpoint) SwaggerDefs(s map[string]interface{}) {
s["paths"].(map[string]interface{})["/v1/dir/{tree}/{path}"] = map[string]interface{}{
"get": map[string]interface{}{
"summary": "Read a directory.",
"description": "List the contents of a directory.",
"produces": []string{
"text/plain",
"application/json",
},
"parameters": []map[string]interface{}{
{
"name": "tree",
"in": "path",
"description": "Name of the tree.",
"required": true,
"type": "string",
},
{
"name": "path",
"in": "path",
"description": "Directory path.",
"required": true,
"type": "string",
},
{
"name": "recursive",
"in": "query",
"description": "Add listings of subdirectories.",
"required": false,
"type": "boolean",
},
{
"name": "checksums",
"in": "query",
"description": "Include file checksums.",
"required": false,
"type": "boolean",
},
},
"responses": map[string]interface{}{
"200": map[string]interface{}{
"description": "Returns a map of directories with a list of files as values.",
},
"default": map[string]interface{}{
"description": "Error response",
"schema": map[string]interface{}{
"$ref": "#/definitions/Error",
},
},
},
},
}
// Add generic error object to definition
s["definitions"].(map[string]interface{})["Error"] = map[string]interface{}{
"description": "A human readable error mesage.",
"type": "string",
}
}
/*
* Rufs - Remote Union File System
*
* Copyright 2017 Matthias Ladkau. All rights reserved.
*
* This Source Code Form is subject to the terms of the MIT
* License, If a copy of the MIT License was not distributed with this
* file, You can obtain one at https://opensource.org/licenses/MIT.
*/
package v1
import (
"encoding/json"
"fmt"
"mime/multipart"
"net/http"
"path"
"time"
"devt.de/krotik/common/cryptutil"
"devt.de/krotik/common/datautil"
"devt.de/krotik/common/errorutil"
"devt.de/krotik/common/httputil"
"devt.de/krotik/rufs"
"devt.de/krotik/rufs/api"
)
// Progress endpoint
// =================
/*
Progress is a persisted data structure which contains the current
progress of an ongoing operation.
*/
type Progress struct {
Op string // Operation which we show progress of
Subject string // Subject on which the operation is performed
Progress int64 // Current progress of the ongoing operation (this is reset for each item)
TotalProgress int64 // Total progress required until current operation is finished
Item int64 // Current processing item
TotalItems int64 // Total number of items to process
Errors []string // Any error messages
}
/*
JSONString returns the progress object as a JSON string.
*/
func (p *Progress) JSONString() []byte {
ret, err := json.MarshalIndent(map[string]interface{}{
"operation": p.Op,
"subject": p.Subject,
"progress": p.Progress,
"total_progress": p.TotalProgress,
"item": p.Item,
"total_items": p.TotalItems,
"errors": p.Errors,
}, "", " ")
errorutil.AssertOk(err)
return ret
}
/*
ProgressMap contains information about copy progress.
*/
var ProgressMap = datautil.NewMapCache(100, 0)
/*
EndpointProgress is the progress endpoint URL (rooted). Handles everything
under progress/...
*/
const EndpointProgress = api.APIRoot + APIv1 + "/progress/"
/*
ProgressEndpointInst creates a new endpoint handler.
*/
func ProgressEndpointInst() api.RestEndpointHandler {
return &progressEndpoint{}
}
/*
Handler object for progress operations.
*/
type progressEndpoint struct {
*api.DefaultEndpointHandler
}
/*
HandleGET handles a progress query REST call.
*/
func (f *progressEndpoint) HandleGET(w http.ResponseWriter, r *http.Request, resources []string) {
var ok bool
var err error
if len(resources) < 2 {
http.Error(w, "Need a tree name and a progress ID",
http.StatusBadRequest)
return
}
if _, ok, err = api.GetTree(resources[0]); err == nil && !ok {
err = fmt.Errorf("Unknown tree: %v", resources[0])
}
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
p, ok := ProgressMap.Get(resources[0] + "#" + resources[1])
if !ok {
http.Error(w, fmt.Sprintf("Unknown progress ID: %v", resources[1]),
http.StatusBadRequest)
return
}
w.Header().Set("content-type", "application/octet-stream")
w.Write(p.(*Progress).JSONString())
}
/*
SwaggerDefs is used to describe the endpoint in swagger.
*/
func (f *progressEndpoint) SwaggerDefs(s map[string]interface{}) {
s["paths"].(map[string]interface{})["/v1/progress/{tree}/{progress_id}"] = map[string]interface{}{
"get": map[string]interface{}{
"summary": "Request progress update.",
"description": "Return a progress object showing the progress of an ongoing operation.",
"produces": []string{
"text/plain",
"application/json",
},
"parameters": []map[string]interface{}{
{
"name": "tree",
"in": "path",
"description": "Name of the tree.",
"required": true,
"type": "string",
},
{
"name": "progress_id",
"in": "path",
"description": "Id of progress object.",
"required": true,
"type": "string",
},
},
"responses": map[string]interface{}{
"200": map[string]interface{}{
"description": "Returns the requested progress object.",
},
"default": map[string]interface{}{
"description": "Error response",
"schema": map[string]interface{}{
"$ref": "#/definitions/Error",
},
},
},
},
}
// Add generic error object to definition
s["definitions"].(map[string]interface{})["Error"] = map[string]interface{}{
"description": "A human readable error mesage.",
"type": "string",
}
}
// File endpoint
// =============
/*
EndpointFile is the file endpoint URL (rooted). Handles everything
under file/...
*/
const EndpointFile = api.APIRoot + APIv1 + "/file/"
/*
FileEndpointInst creates a new endpoint handler.
*/
func FileEndpointInst() api.RestEndpointHandler {
return &fileEndpoint{}
}
/*
Handler object for file operations.
*/
type fileEndpoint struct {
*api.DefaultEndpointHandler
}
/*
HandleGET handles a file query REST call.
*/
func (f *fileEndpoint) HandleGET(w http.ResponseWriter, r *http.Request, resources []string) {
var tree *rufs.Tree
var ok bool
var err error
if len(resources) < 2 {
http.Error(w, "Need a tree name and a file path",
http.StatusBadRequest)
return
}
if tree, ok, err = api.GetTree(resources[0]); err == nil && !ok {
err = fmt.Errorf("Unknown tree: %v", resources[0])
}
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
w.Header().Set("content-type", "application/octet-stream")
if err := tree.ReadFileToBuffer(path.Join(resources[1:]...), w); err != nil {
http.Error(w, fmt.Sprintf("Could not read file %v: %v", path.Join(resources[1:]...), err.Error()),
http.StatusBadRequest)
return
}
}
/*
HandlePUT handles REST calls to modify / copy existing files.
*/
func (f *fileEndpoint) HandlePUT(w http.ResponseWriter, r *http.Request, resources []string) {
f.handleFileOp("PUT", w, r, resources)
}
/*
HandleDELETE handles REST calls to delete existing files.
*/
func (f *fileEndpoint) HandleDELETE(w http.ResponseWriter, r *http.Request, resources []string) {
f.handleFileOp("DELETE", w, r, resources)
}
func (f *fileEndpoint) handleFileOp(requestType string, w http.ResponseWriter, r *http.Request, resources []string) {
var action string
var data, ret map[string]interface{}
var tree *rufs.Tree
var ok bool
var err error
var files []string
if len(resources) < 1 {
http.Error(w, "Need a tree name and a file path",
http.StatusBadRequest)
return
} else if len(resources) == 1 {
resources = append(resources, "/")
}
if tree, ok, err = api.GetTree(resources[0]); err == nil && !ok {
err = fmt.Errorf("Unknown tree: %v", resources[0])
}
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
ret = make(map[string]interface{})
if requestType == "DELETE" {
// See if the request contains a body with a list of files
err = json.NewDecoder(r.Body).Decode(&files)
} else {
// Unless it is a delete request we need an action command
err = json.NewDecoder(r.Body).Decode(&data)
if err != nil {
http.Error(w, fmt.Sprintf("Could not decode request body: %v", err.Error()),
http.StatusBadRequest)
return
}
actionObj, ok := data["action"]
if !ok {
http.Error(w, fmt.Sprintf("Action command is missing from request body"),
http.StatusBadRequest)
return
}
action = fmt.Sprint(actionObj)
}
fullPath := path.Join(resources[1:]...)
if fullPath != "/" {
fullPath = "/" + fullPath
}
dir, file := path.Split(fullPath)
if requestType == "DELETE" {
if len(files) == 0 {
_, err = tree.ItemOp(dir, map[string]string{
rufs.ItemOpAction: rufs.ItemOpActDelete,
rufs.ItemOpName: file,
})
} else {
// Delete the files given in the body
for _, f := range files {
dir, file := path.Split(f)
if err == nil {
_, err = tree.ItemOp(dir, map[string]string{
rufs.ItemOpAction: rufs.ItemOpActDelete,
rufs.ItemOpName: file,
})
}
}
}
} else if action == "rename" {
if newNamesParam, ok := data["newnames"]; ok {
if newNames, ok := newNamesParam.([]interface{}); !ok {
err = fmt.Errorf("Parameter newnames must be a list of filenames")
} else {
if filesParam, ok := data["files"]; !ok {
err = fmt.Errorf("Parameter files is missing from request body")
} else {
if filesList, ok := filesParam.([]interface{}); !ok {
err = fmt.Errorf("Parameter files must be a list of files")
} else {
for i, f := range filesList {
dir, file := path.Split(fmt.Sprint(f))
if err == nil {
_, err = tree.ItemOp(dir, map[string]string{
rufs.ItemOpAction: rufs.ItemOpActRename,
rufs.ItemOpName: file,
rufs.ItemOpNewName: fmt.Sprint(newNames[i]),
})
}
}
}
}
}
} else {
newName, ok := data["newname"]
if !ok {
err = fmt.Errorf("Parameter newname is missing from request body")
} else {
_, err = tree.ItemOp(dir, map[string]string{
rufs.ItemOpAction: rufs.ItemOpActRename,
rufs.ItemOpName: file,
rufs.ItemOpNewName: fmt.Sprint(newName),
})
}
}
} else if action == "mkdir" {
_, err = tree.ItemOp(dir, map[string]string{
rufs.ItemOpAction: rufs.ItemOpActMkDir,
rufs.ItemOpName: file,
})
} else if action == "copy" {
dest, ok := data["destination"]
if !ok {
err = fmt.Errorf("Parameter destination is missing from request body")
} else {
// Create file list
filesParam, hasFilesParam := data["files"]
if hasFilesParam {
if lf, ok := filesParam.([]interface{}); !ok {
err = fmt.Errorf("Parameter files must be a list of files")
} else {
files = make([]string, len(lf))
for i, f := range lf {
files[i] = fmt.Sprint(f)
}
}
} else {
files = []string{fullPath}
}
if err == nil {
// Create progress object
uuid := fmt.Sprintf("%x", cryptutil.GenerateUUID())
ret["progress_id"] = uuid
mapLookup := resources[0] + "#" + uuid
ProgressMap.Put(mapLookup, &Progress{
Op: "Copy",
Subject: "",
Progress: 0,
TotalProgress: 0,
Item: 0,
TotalItems: int64(len(files)),
Errors: []string{},
})
go func() {
err = tree.Copy(files, fmt.Sprint(dest),
func(file string, writtenBytes, totalBytes, currentFile, totalFiles int64) {
if p, ok := ProgressMap.Get(mapLookup); ok && writtenBytes > 0 {
p.(*Progress).Subject = file
p.(*Progress).Progress = writtenBytes
p.(*Progress).TotalProgress = totalBytes
p.(*Progress).Item = currentFile
p.(*Progress).TotalItems = totalFiles
}
})
if err != nil {
if p, ok := ProgressMap.Get(mapLookup); ok {
p.(*Progress).Errors = append(p.(*Progress).Errors, err.Error())
}
}
}()
// Wait a little bit so immediate errors are directly reported
time.Sleep(10 * time.Millisecond)
}
}
} else if action == "sync" {
dest, ok := data["destination"]
if !ok {
err = fmt.Errorf("Parameter destination is missing from request body")
} else {
uuid := fmt.Sprintf("%x", cryptutil.GenerateUUID())
ret["progress_id"] = uuid
mapLookup := resources[0] + "#" + uuid
ProgressMap.Put(mapLookup, &Progress{
Op: "Sync",
Subject: "",
Progress: 0,
TotalProgress: -1,
Item: 0,
TotalItems: -1,
Errors: []string{},
})
go func() {
err = tree.Sync(fullPath, fmt.Sprint(dest), true,
func(op, srcFile, dstFile string, writtenBytes, totalBytes, currentFile, totalFiles int64) {
if p, ok := ProgressMap.Get(mapLookup); ok && writtenBytes > 0 {
p.(*Progress).Op = op
p.(*Progress).Subject = srcFile
p.(*Progress).Progress = writtenBytes
p.(*Progress).TotalProgress = totalBytes
p.(*Progress).Item = currentFile
p.(*Progress).TotalItems = totalFiles
}
})
if err != nil {
if p, ok := ProgressMap.Get(mapLookup); ok {
p.(*Progress).Errors = append(p.(*Progress).Errors, err.Error())
}
}
}()
// Wait a little bit so immediate errors are directly reported
time.Sleep(10 * time.Millisecond)
}
} else {
err = fmt.Errorf("Unknown action: %v", action)
}
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// Write data
w.Header().Set("content-type", "application/json; charset=utf-8")
json.NewEncoder(w).Encode(ret)
}
/*
HandlePOST handles REST calls to create or overwrite a new file.
*/
func (f *fileEndpoint) HandlePOST(w http.ResponseWriter, r *http.Request, resources []string) {
var err error
var tree *rufs.Tree
var ok bool
if len(resources) < 1 {
http.Error(w, "Need a tree name and a file path",
http.StatusBadRequest)
return
}
if tree, ok, err = api.GetTree(resources[0]); err == nil && !ok {
err = fmt.Errorf("Unknown tree: %v", resources[0])
}
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// Check we have the right request type
if r.MultipartForm == nil {
if err = r.ParseMultipartForm(32 << 20); err != nil {
http.Error(w, fmt.Sprintf("Could not read request body: %v", err.Error()),
http.StatusBadRequest)
return
}
}
if r.MultipartForm != nil && r.MultipartForm.File != nil {
// Check the files are in the form field uploadfile
files, ok := r.MultipartForm.File["uploadfile"]
if !ok {
http.Error(w, "Could not find 'uploadfile' form field",
http.StatusBadRequest)
return
}
for _, file := range files {
var f multipart.File
// Write out all send files
if f, err = file.Open(); err == nil {
err = tree.WriteFileFromBuffer(path.Join(path.Join(resources[1:]...), file.Filename), f)
}
if err != nil {
http.Error(w, fmt.Sprintf("Could not write file %v: %v", path.Join(resources[1:]...)+file.Filename, err.Error()),
http.StatusBadRequest)
return
}
}
}
if redirect := r.PostFormValue("redirect"); redirect != "" {
// Do the redirect - make sure it is a local redirect
if err = httputil.CheckLocalRedirect(redirect); err != nil {
http.Error(w, fmt.Sprintf("Could not redirect: %v", err.Error()),
http.StatusBadRequest)
return
}
http.Redirect(w, r, redirect, http.StatusFound)
}
}
/*
SwaggerDefs is used to describe the endpoint in swagger.
*/
func (f *fileEndpoint) SwaggerDefs(s map[string]interface{}) {
s["paths"].(map[string]interface{})["/v1/file/{tree}/{path}"] = map[string]interface{}{
"get": map[string]interface{}{
"summary": "Read a file.",
"description": "Return the contents of a file.",
"produces": []string{
"text/plain",
"application/octet-stream",
},
"parameters": []map[string]interface{}{
{
"name": "tree",
"in": "path",
"description": "Name of the tree.",
"required": true,
"type": "string",
},
{
"name": "path",
"in": "path",
"description": "File path.",
"required": true,
"type": "string",
},
},
"responses": map[string]interface{}{
"200": map[string]interface{}{
"description": "Returns the content of the requested file.",
},
"default": map[string]interface{}{
"description": "Error response",
"schema": map[string]interface{}{
"$ref": "#/definitions/Error",
},
},
},
},
"put": map[string]interface{}{
"summary": "Perform a file operation.",
"description": "Perform a file operation like rename or copy.",
"consumes": []string{
"application/json",
},
"produces": []string{
"text/plain",
"application/json",
},
"parameters": []map[string]interface{}{
{
"name": "tree",
"in": "path",
"description": "Name of the tree.",
"required": true,
"type": "string",
},
{
"name": "path",
"in": "path",
"description": "File path.",
"required": true,
"type": "string",
},
{
"name": "operation",
"in": "body",
"description": "Operation which should be executes",
"required": true,
"schema": map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"action": map[string]interface{}{
"description": "Action to perform.",
"type": "string",
"enum": []string{
"rename",
"mkdir",
"copy",
"sync",
},
},
"newname": map[string]interface{}{
"description": "New filename when renaming a single file.",
"type": "string",
},
"newnames": map[string]interface{}{
"description": "List of new file names when renaming multiple files using the files parameter.",
"type": "array",
"items": map[string]interface{}{
"description": "New filename.",
"type": "string",
},
},
"destination": map[string]interface{}{
"description": "Destination directory when copying files.",
"type": "string",
},
"files": map[string]interface{}{
"description": "List of (full path) files which should be copied / renamed.",
"type": "array",
"items": map[string]interface{}{
"description": "File (with full path) which should be copied / renamed.",
"type": "string",
},
},
},
},
},
},
"responses": map[string]interface{}{
"200": map[string]interface{}{
"description": "Returns the content of the requested file.",
},
"default": map[string]interface{}{
"description": "Error response",
"schema": map[string]interface{}{
"$ref": "#/definitions/Error",
},
},
},
},
"post": map[string]interface{}{
"summary": "Upload a file.",
"description": "Upload or overwrite a file.",
"produces": []string{
"text/plain",
},
"consumes": []string{
"multipart/form-data",
},
"parameters": []map[string]interface{}{
{
"name": "tree",
"in": "path",
"description": "Name of the tree.",
"required": true,
"type": "string",
},
{
"name": "path",
"in": "path",
"description": "File path.",
"required": true,
"type": "string",
},
{
"name": "redirect",
"in": "formData",
"description": "Page to redirect to after processing the request.",
"required": false,
"type": "string",
},
{
"name": "uploadfile",
"in": "formData",
"description": "File(s) to create / overwrite.",
"required": true,
"type": "file",
},
},
"responses": map[string]interface{}{
"200": map[string]interface{}{
"description": "Successful upload no redirect parameter given.",
},
"302": map[string]interface{}{
"description": "Successful upload - redirect according to the given redirect parameter.",
},
"default": map[string]interface{}{
"description": "Error response",
"schema": map[string]interface{}{
"$ref": "#/definitions/Error",
},
},
},
},
"delete": map[string]interface{}{
"summary": "Delete a file or directory.",
"description": "Delete a file or directory.",
"produces": []string{
"text/plain",
},
"parameters": []map[string]interface{}{
{
"name": "tree",
"in": "path",
"description": "Name of the tree.",
"required": true,
"type": "string",
},
{
"name": "path",
"in": "path",
"description": "File or directory path.",
"required": true,
"type": "string",
},
{
"name": "filelist",
"in": "body",
"description": "List of (full path) files which should be deleted",
"required": false,
"schema": map[string]interface{}{
"type": "array",
"items": map[string]interface{}{
"description": "File (with full path) which should be deleted.",
"type": "string",
},
},
},
},
"responses": map[string]interface{}{
"200": map[string]interface{}{
"description": "Returns the content of the requested file.",
},
"default": map[string]interface{}{
"description": "Error response",
"schema": map[string]interface{}{
"$ref": "#/definitions/Error",
},
},
},
},
}
// Add generic error object to definition
s["definitions"].(map[string]interface{})["Error"] = map[string]interface{}{
"description": "A human readable error mesage.",
"type": "string",
}
}
/*
* Rufs - Remote Union File System
*
* Copyright 2017 Matthias Ladkau. All rights reserved.
*
* This Source Code Form is subject to the terms of the MIT
* License, If a copy of the MIT License was not distributed with this
* file, You can obtain one at https://opensource.org/licenses/MIT.
*/
package v1
import (
"fmt"
"net/http"
"strings"
"devt.de/krotik/rufs/api"
)
/*
APIv1 is the directory for version 1 of the API
*/
const APIv1 = "/v1"
/*
V1EndpointMap is a map of urls to endpoints for version 1 of the API
*/
var V1EndpointMap = map[string]api.RestEndpointInst{
EndpointAdmin: AdminEndpointInst,
EndpointDir: DirEndpointInst,
EndpointFile: FileEndpointInst,
EndpointProgress: ProgressEndpointInst,
EndpointZip: ZipEndpointInst,
}
// Helper functions
// ================
/*
checkResources check given resources for a GET request.
*/
func checkResources(w http.ResponseWriter, resources []string, requiredMin int, requiredMax int, errorMsg string) bool {
if len(resources) < requiredMin {
http.Error(w, errorMsg, http.StatusBadRequest)
return false
} else if len(resources) > requiredMax {
http.Error(w, "Invalid resource specification: "+strings.Join(resources[1:], "/"), http.StatusBadRequest)
return false
}
return true
}
/*
getMapValue extracts a value from a given map.
*/
func getMapValue(w http.ResponseWriter, data map[string]interface{}, key string) (string, bool) {
if val, ok := data[key]; ok && val != "" {
return fmt.Sprint(val), true
}
http.Error(w, fmt.Sprintf("Value for %v is missing in posted data", key), http.StatusBadRequest)
return "", false
}
/*
* Rufs - Remote Union File System
*
* Copyright 2017 Matthias Ladkau. All rights reserved.
*
* This Source Code Form is subject to the terms of the MIT
* License, If a copy of the MIT License was not distributed with this
* file, You can obtain one at https://opensource.org/licenses/MIT.
*/
package v1
import (
"archive/zip"
"bytes"
"encoding/json"
"fmt"
"net/http"
"devt.de/krotik/rufs"
"devt.de/krotik/rufs/api"
)
/*
EndpointZip is the zip endpoint URL (rooted). Handles everything
under zip/...
*/
const EndpointZip = api.APIRoot + APIv1 + "/zip/"
/*
ZipEndpointInst creates a new endpoint handler.
*/
func ZipEndpointInst() api.RestEndpointHandler {
return &zipEndpoint{}
}
/*
Handler object for zip operations.
*/
type zipEndpoint struct {
*api.DefaultEndpointHandler
}
/*
HandlePOST handles a zip query REST call.
*/
func (z *zipEndpoint) HandlePOST(w http.ResponseWriter, r *http.Request, resources []string) {
var tree *rufs.Tree
var data []string
var ok bool
var err error
if !checkResources(w, resources, 1, 1, "Need a tree name") {
return
}
if tree, ok, err = api.GetTree(resources[0]); err == nil && !ok {
http.Error(w, fmt.Sprintf("Unknown tree: %v", resources[0]), http.StatusBadRequest)
return
}
if err = r.ParseForm(); err == nil {
files := r.Form["files"]
if len(files) == 0 {
err = fmt.Errorf("Field 'files' should be a list of files as JSON encoded string")
} else {
err = json.NewDecoder(bytes.NewBufferString(files[0])).Decode(&data)
}
}
if err != nil {
http.Error(w, fmt.Sprintf("Could not decode request body: %v", err.Error()),
http.StatusBadRequest)
return
}
w.Header().Set("content-type", "application/octet-stream")
w.Header().Set("content-disposition", `attachment; filename="files.zip"`)
// Go through the list of files and stream the zip file
zipW := zip.NewWriter(w)
for _, f := range data {
writer, _ := zipW.Create(f)
tree.ReadFileToBuffer(f, writer)
}
zipW.Close()
}
/*
SwaggerDefs is used to describe the endpoint in swagger.
*/
func (z *zipEndpoint) SwaggerDefs(s map[string]interface{}) {
s["paths"].(map[string]interface{})["/v1/zip/{tree}"] = map[string]interface{}{
"post": map[string]interface{}{
"summary": "Create zip file from a list of files.",
"description": "Combine a list of given files into a single zip file.",
"produces": []string{
"text/plain",
},
"consumes": []string{
"application/x-www-form-urlencoded",
},
"parameters": []map[string]interface{}{
{
"name": "tree",
"in": "path",
"description": "Name of the tree.",
"required": true,
"type": "string",
},
{
"name": "files",
"in": "body",
"description": "JSON encoded list of (full path) files which should be zipped up",
"required": true,
"schema": map[string]interface{}{
"type": "array",
"items": map[string]interface{}{
"description": "File (with full path) which should be included in the zip file.",
"type": "string",
},
},
},
},
"responses": map[string]interface{}{
"200": map[string]interface{}{
"description": "Returns the content of the requested file.",
},
"default": map[string]interface{}{
"description": "Error response",
"schema": map[string]interface{}{
"$ref": "#/definitions/Error",
},
},
},
},
}
}
/*
* Rufs - Remote Union File System
*
* Copyright 2017 Matthias Ladkau. All rights reserved.
*
* This Source Code Form is subject to the terms of the MIT
* License, If a copy of the MIT License was not distributed with this
* file, You can obtain one at https://opensource.org/licenses/MIT.
*/
/*
Package rufs contains the main API to Rufs.
Rufs is organized as a collection of branches. Each branch represents a physical
file system structure which can be queried and updated by an authorized client.
On the client side one or several branches are organized into a tree. The
single branches can overlay each other. For example:
Branch A
/foo/A
/foo/B
/bar/C
Branch B
/foo/C
/test/D
Tree 1
/myspace => Branch A, Branch B
Accessing tree with:
/myspace/foo/A gets file /foo/A from Branch A while
/myspace/foo/C gets file /foo/C from Branch B
Write operations go only to branches which are mapped as writing branches
and who accept them (i.e. are not set to readonly on the side of the branch).
*/
package rufs
import (
"bytes"
"crypto/tls"
"encoding/gob"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"regexp"
"strconv"
"strings"
"devt.de/krotik/common/errorutil"
"devt.de/krotik/common/fileutil"
"devt.de/krotik/common/pools"
"devt.de/krotik/common/stringutil"
"devt.de/krotik/rufs/config"
"devt.de/krotik/rufs/node"
)
func init() {
// Make sure we can use the relevant types in a gob operation
gob.Register([][]os.FileInfo{})
gob.Register(&FileInfo{})
}
/*
Branch models a single exported branch in Rufs.
*/
type Branch struct {
rootPath string // Local directory (absolute path) modeling the branch root
node *node.RufsNode // Local RPC node
readonly bool // Flag if this branch is readonly
}
/*
NewBranch returns a new exported branch.
*/
func NewBranch(cfg map[string]interface{}, cert *tls.Certificate) (*Branch, error) {
var err error
var b *Branch
// Make sure the given config is ok
if err = config.CheckBranchExportConfig(cfg); err == nil {
// Create RPC server
addr := fmt.Sprintf("%v:%v", fileutil.ConfStr(cfg, config.RPCHost),
fileutil.ConfStr(cfg, config.RPCPort))
rn := node.NewNode(addr, fileutil.ConfStr(cfg, config.BranchName),
fileutil.ConfStr(cfg, config.BranchSecret), cert, nil)
// Start the rpc server
if err = rn.Start(cert); err == nil {
var rootPath string
// Construct root path
if rootPath, err = filepath.Abs(fileutil.ConfStr(cfg, config.LocalFolder)); err == nil {
b = &Branch{rootPath, rn, fileutil.ConfBool(cfg, config.EnableReadOnly)}
rn.DataHandler = b.requestHandler
}
}
}
return b, err
}
/*
Name returns the name of the branch.
*/
func (b *Branch) Name() string {
return b.node.Name()
}
/*
SSLFingerprint returns the SSL fingerprint of the branch.
*/
func (b *Branch) SSLFingerprint() string {
return b.node.SSLFingerprint()
}
/*
Shutdown shuts the branch down.
*/
func (b *Branch) Shutdown() error {
return b.node.Shutdown()
}
/*
IsReadOnly returns if this branch is read-only.
*/
func (b *Branch) IsReadOnly() bool {
return b.readonly
}
/*
checkReadOnly returns an error if this branch is read-only.
*/
func (b *Branch) checkReadOnly() error {
var err error
if b.IsReadOnly() {
err = fmt.Errorf("Branch %v is read-only", b.Name())
}
return err
}
// Branch API
// ==========
/*
Dir returns file listings matching a given pattern of one or more directories.
The contents of the given path is returned along with checksums if the checksum
flag is specified. Optionally, also the contents of all subdirectories can be
returned if the recursive flag is set. The return values is a list of traversed
directories (platform-agnostic) and their corresponding contents.
*/
func (b *Branch) Dir(spath string, pattern string, recursive bool, checksums bool) ([]string, [][]os.FileInfo, error) {
var fis []os.FileInfo
// Compile pattern
re, err := regexp.Compile(pattern)
if err != nil {
return nil, nil, err
}
createRufsFileInfos := func(dirname string, afis []os.FileInfo) []os.FileInfo {
var fis []os.FileInfo
fis = make([]os.FileInfo, 0, len(afis))
for _, fi := range afis {
// Append if it matches the pattern
if re.MatchString(fi.Name()) {
fis = append(fis, fi)
}
}
// Wrap normal file infos and calculate checksum if necessary
ret := WrapFileInfos(dirname, fis)
if checksums {
for _, fi := range fis {
if !fi.IsDir() {
// The sum is either there or not ... - access errors should
// be caught when trying to read the file
sum, _ := fileutil.CheckSumFileFast(filepath.Join(dirname, fi.Name()))
fi.(*FileInfo).FiChecksum = sum
}
}
}
return ret
}
subPath, err := b.constructSubPath(spath)
if err == nil {
if !recursive {
if fis, err = ioutil.ReadDir(subPath); err == nil {
return []string{spath},
[][]os.FileInfo{createRufsFileInfos(subPath, fis)}, nil
}
} else {
var rpaths []string
var rfis [][]os.FileInfo
var addSubDir func(string, string) error
// Recursive function to walk directories and symlinks
// in a platform-agnostic way
addSubDir = func(p string, rp string) error {
fis, err = ioutil.ReadDir(p)
if err == nil {
rpaths = append(rpaths, rp)
rfis = append(rfis, createRufsFileInfos(p, fis))
for _, fi := range fis {
if err == nil && fi.IsDir() {
err = addSubDir(filepath.Join(p, fi.Name()),
path.Join(rp, fi.Name()))
}
}
}
return err
}
if err = addSubDir(subPath, spath); err == nil {
return rpaths, rfis, nil
}
}
}
// Ignore any not exists errors
if os.IsNotExist(err) {
err = nil
}
return nil, nil, err
}
/*
ReadFileToBuffer reads a complete file into a given buffer which implements
io.Writer.
*/
func (b *Branch) ReadFileToBuffer(spath string, buf io.Writer) error {
var n int
var err error
var offset int64
readBuf := make([]byte, DefaultReadBufferSize)
for err == nil {
n, err = b.ReadFile(spath, readBuf, offset)
if err == nil {
_, err = buf.Write(readBuf[:n])
offset += int64(n)
} else if IsEOF(err) {
// We reached the end of the file
err = nil
break
}
}
return err
}
/*
ReadFile reads up to len(p) bytes into p from the given offset. It
returns the number of bytes read (0 <= n <= len(p)) and any error
encountered.
*/
func (b *Branch) ReadFile(spath string, p []byte, offset int64) (int, error) {
var n int
subPath, err := b.constructSubPath(spath)
if err == nil {
var fi os.FileInfo
if fi, err = os.Stat(subPath); err == nil {
if fi.IsDir() {
err = fmt.Errorf("read /%v: is a directory", spath)
} else if err == nil {
var f *os.File
if f, err = os.Open(subPath); err == nil {
defer f.Close()
sr := io.NewSectionReader(f, 0, fi.Size())
if _, err = sr.Seek(offset, io.SeekStart); err == nil {
n, err = sr.Read(p)
}
}
}
}
}
return n, err
}
/*
WriteFileFromBuffer writes a complete file from a given buffer which implements
io.Reader.
*/
func (b *Branch) WriteFileFromBuffer(spath string, buf io.Reader) error {
var err error
var offset int64
if err = b.checkReadOnly(); err == nil {
writeBuf := make([]byte, DefaultReadBufferSize)
for err == nil {
var n int
if n, err = buf.Read(writeBuf); err == nil {
_, err = b.WriteFile(spath, writeBuf[:n], offset)
offset += int64(n)
} else if IsEOF(err) {
// We reached the end of the file
b.WriteFile(spath, []byte{}, offset)
err = nil
break
}
}
}
return err
}
/*
WriteFile writes p into the given file from the given offset. It
returns the number of written bytes and any error encountered.
*/
func (b *Branch) WriteFile(spath string, p []byte, offset int64) (int, error) {
var n int
var m int64
if err := b.checkReadOnly(); err != nil {
return 0, err
}
buf := byteSlicePool.Get().([]byte)
defer func() {
byteSlicePool.Put(buf)
}()
growFile := func(f *os.File, n int64) {
var err error
toWrite := n
for err == nil && toWrite > 0 {
if toWrite > int64(DefaultReadBufferSize) {
_, err = f.Write(buf[:DefaultReadBufferSize])
toWrite -= int64(DefaultReadBufferSize)
} else {
_, err = f.Write(buf[:toWrite])
toWrite = 0
}
}
}
subPath, err := b.constructSubPath(spath)
if err == nil {
var fi os.FileInfo
var f *os.File
if fi, err = os.Stat(subPath); os.IsNotExist(err) {
// Ensure path exists
dir, _ := filepath.Split(subPath)
if err = os.MkdirAll(dir, 0755); err == nil {
// Create the file newly
if f, err = os.OpenFile(subPath, os.O_RDWR|os.O_CREATE, 0644); err == nil {
defer f.Close()
if offset > 0 {
growFile(f, offset)
}
m, err = io.Copy(f, bytes.NewBuffer(p))
n += int(m)
}
}
} else if err == nil {
// File does exist
if f, err := os.OpenFile(subPath, os.O_RDWR, 0644); err == nil {
defer f.Close()
if fi.Size() < offset {
f.Seek(fi.Size(), io.SeekStart)
growFile(f, offset-fi.Size())
} else {
f.Seek(offset, io.SeekStart)
}
m, err = io.Copy(f, bytes.NewBuffer(p))
errorutil.AssertOk(err)
n += int(m)
}
}
}
return n, err
}
/*
ItemOp parameter
*/
const (
ItemOpAction = "itemop_action" // ItemOp action
ItemOpName = "itemop_name" // Item name
ItemOpNewName = "itemop_newname" // New item name
)
/*
ItemOp actions
*/
const (
ItemOpActRename = "rename" // Rename a file or directory
ItemOpActDelete = "delete" // Delete a file or directory
ItemOpActMkDir = "mkdir" // Create a directory
)
/*
ItemOp executes a file or directory specific operation which can either
succeed or fail (e.g. rename or delete). Actions and parameters should
be given in the opdata map.
*/
func (b *Branch) ItemOp(spath string, opdata map[string]string) (bool, error) {
res := false
if err := b.checkReadOnly(); err != nil {
return false, err
}
subPath, err := b.constructSubPath(spath)
if err == nil {
action := opdata[ItemOpAction]
fileFromOpData := func(key string) (string, error) {
// Make sure we are only dealing with files
_, name := filepath.Split(opdata[key])
if name == "" {
return "", fmt.Errorf("This operation requires a specific file or directory")
}
// Build the relative paths
return filepath.Join(filepath.FromSlash(subPath), name), nil
}
if action == ItemOpActMkDir {
var name string
// Make directory action
if name, err = fileFromOpData(ItemOpName); err == nil {
err = os.MkdirAll(name, 0755)
}
} else if action == ItemOpActRename {
var name, newname string
// Rename action
if name, err = fileFromOpData(ItemOpName); err == nil {
if newname, err = fileFromOpData(ItemOpNewName); err == nil {
err = os.Rename(name, newname)
}
}
} else if action == ItemOpActDelete {
var name string
// Delete action
if name, err = fileFromOpData(ItemOpName); err == nil {
del := func(name string) error {
var err error
if ok, _ := fileutil.PathExists(name); ok {
err = os.RemoveAll(name)
} else {
err = os.ErrNotExist
}
return err
}
if strings.Contains(name, "*") {
var rex string
// We have a wildcard
rootdir, glob := filepath.Split(name)
// Create a regex from the given glob expression
if rex, err = stringutil.GlobToRegex(glob); err == nil {
var dirs []string
var fis [][]os.FileInfo
if dirs, fis, err = b.Dir(spath, rex, true, false); err == nil {
for i, dir := range dirs {
// Remove all files and dirs according to the wildcard
for _, fi := range fis[i] {
os.RemoveAll(filepath.Join(rootdir,
filepath.FromSlash(dir), fi.Name()))
}
}
}
}
} else {
err = del(name)
}
}
}
// Determine if we succeeded
res = err == nil || os.IsNotExist(err)
}
return res, err
}
// Request handling functions
// ==========================
/*
DefaultReadBufferSize is the default size for file reading.
*/
var DefaultReadBufferSize = 1024 * 16
/*
bufferPool holds buffers which are used to marshal objects.
*/
var bufferPool = pools.NewByteBufferPool()
/*
byteSlicePool holds buffers which are used to read files
*/
var byteSlicePool = pools.NewByteSlicePool(DefaultReadBufferSize)
/*
Meta parameter
*/
const (
ParamAction = "a" // Requested action
ParamPath = "p" // Path string
ParamPattern = "x" // Pattern string
ParamRecursive = "r" // Recursive flag
ParamChecksums = "c" // Checksum flag
ParamOffset = "o" // Offset parameter
ParamSize = "s" // Size parameter
)
/*
Possible actions
*/
const (
OpDir = "dir" // Read the contents of a path
OpRead = "read" // Read the contents of a file
OpWrite = "write" // Read the contents of a file
OpItemOp = "itemop" // File or directory operation
)
/*
requestHandler handles incoming requests from other branches or trees.
*/
func (b *Branch) requestHandler(ctrl map[string]string, data []byte) ([]byte, error) {
var err error
var res interface{}
var ret []byte
action := ctrl[ParamAction]
// Handle operation requests
if action == OpDir {
var dirs []string
var fis [][]os.FileInfo
dir := ctrl[ParamPath]
pattern := ctrl[ParamPattern]
rec := strings.ToLower(ctrl[ParamRecursive]) == "true"
sum := strings.ToLower(ctrl[ParamChecksums]) == "true"
if dirs, fis, err = b.Dir(dir, pattern, rec, sum); err == nil {
res = []interface{}{dirs, fis}
}
} else if action == OpItemOp {
res, err = b.ItemOp(ctrl[ParamPath], ctrl)
} else if action == OpRead {
var size, n int
var offset int64
spath := ctrl[ParamPath]
if size, err = strconv.Atoi(ctrl[ParamSize]); err == nil {
if offset, err = strconv.ParseInt(ctrl[ParamOffset], 10, 64); err == nil {
buf := byteSlicePool.Get().([]byte)
defer func() {
byteSlicePool.Put(buf)
}()
if len(buf) < size {
// Constantly requesting bigger buffers will
// eventually replace all default sized buffers
buf = make([]byte, size)
}
if n, err = b.ReadFile(spath, buf[:size], offset); err == nil {
res = []interface{}{n, buf[:size]}
}
}
}
} else if action == OpWrite {
var offset int64
spath := ctrl[ParamPath]
if offset, err = strconv.ParseInt(ctrl[ParamOffset], 10, 64); err == nil {
res, err = b.WriteFile(spath, data, offset)
}
}
// Send the response
if err == nil {
// Allocate a new encoding buffer - no need to lock as
// it is based on sync.Pool
// Pooled encoding buffers are used to keep expensive buffer
// reallocations to a minimum. It is better to allocate the
// actual response buffer once the response size is known.
bb := bufferPool.Get().(*bytes.Buffer)
if err = gob.NewEncoder(bb).Encode(res); err == nil {
toSend := bb.Bytes()
// Allocate the response array
ret = make([]byte, len(toSend))
// Copy encoded result into the response array
copy(ret, toSend)
}
// Return the encoding buffer back to the pool
go func() {
bb.Reset()
bufferPool.Put(bb)
}()
}
if err != nil {
// Ensure we don't leak local paths - this might not work in
// all situations and depends on the underlying os. In this
// error messages might include information on the full local
// path in error messages.
absRoot, _ := filepath.Abs(b.rootPath)
err = fmt.Errorf("%v", strings.Replace(err.Error(), absRoot, "", -1))
}
return ret, err
}
// Util functions
// ==============
func (b *Branch) constructSubPath(rpath string) (string, error) {
// Produce the actual subpath - this should also produce windows
// paths correctly (i.e. foo/bar -> C:\root\foo\bar)
subPath := filepath.Join(b.rootPath, filepath.FromSlash(rpath))
// Check that the new sub path is under the root path
absSubPath, err := filepath.Abs(subPath)
if err == nil {
if strings.HasPrefix(absSubPath, b.rootPath) {
return subPath, nil
}
err = fmt.Errorf("Requested path %v is outside of the branch", rpath)
}
return "", err
}
/*
* Rufs - Remote Union File System
*
* Copyright 2017 Matthias Ladkau. All rights reserved.
*
* This Source Code Form is subject to the terms of the MIT
* License, If a copy of the MIT License was not distributed with this
* file, You can obtain one at https://opensource.org/licenses/MIT.
*/
package config
import "fmt"
/*
ProductVersion is the current version of Rufs
*/
const ProductVersion = "1.1.1"
/*
Defaut configuration keys
*/
const (
// Branch configuration (export)
BranchName = "BranchName"
BranchSecret = "BranchSecret"
EnableReadOnly = "EnableReadOnly"
RPCHost = "RPCHost"
RPCPort = "RPCPort"
LocalFolder = "LocalFolder"
// Tree configuration
TreeSecret = "TreeSecret"
)
/*
DefaultBranchExportConfig is the default configuration for an exported branch
*/
var DefaultBranchExportConfig = map[string]interface{}{
BranchName: "", // Auto name (based on available network interface)
BranchSecret: "", // Secret needs to be provided by the client
EnableReadOnly: false, // FS access is readonly for clients
RPCHost: "", // Auto (first available external interface)
RPCPort: "9020", // Communication port for this branch
LocalFolder: "share", // Local folder which is being made available
}
/*
DefaultTreeConfig is the default configuration for a tree which imports branches
*/
var DefaultTreeConfig = map[string]interface{}{
TreeSecret: "", // Secret needs to be provided by the client
}
// Helper functions
// ================
/*
CheckBranchExportConfig checks a given branch export config.
*/
func CheckBranchExportConfig(config map[string]interface{}) error {
for k := range DefaultBranchExportConfig {
if _, ok := config[k]; !ok {
return fmt.Errorf("Missing %v key in branch export config", k)
}
}
return nil
}
/*
CheckTreeConfig checks a given tree config.
*/
func CheckTreeConfig(config map[string]interface{}) error {
for k := range DefaultTreeConfig {
if _, ok := config[k]; !ok {
return fmt.Errorf("Missing %v key in tree config", k)
}
}
return nil
}
/*
* Rufs - Remote Union File System
*
* Copyright 2017 Matthias Ladkau. All rights reserved.
*
* This Source Code Form is subject to the terms of the MIT
* License, If a copy of the MIT License was not distributed with this
* file, You can obtain one at https://opensource.org/licenses/MIT.
*/
package rufs
import (
"fmt"
"os"
"path/filepath"
"time"
)
/*
Special unit test flag - use a common mode to gloss over OS specific
defaults
*/
var unitTestModes = false
/*
FileInfo implements os.FileInfo in an platform-agnostic way
*/
type FileInfo struct {
FiName string // Base name
FiSize int64 // Size in bytes
FiMode os.FileMode // File mode bits
FiModTime time.Time // Modification time
FiChecksum string // Checksum of files
// Private fields which will not be transferred via RPC
isSymLink bool // Flag if this is a symlink (unix)
symLinkTarget string // Target file/directory of the symlink
}
/*
WrapFileInfo wraps a single os.FileInfo object in a serializable FileInfo.
*/
func WrapFileInfo(path string, i os.FileInfo) os.FileInfo {
var realPath string
// Check if we have a symlink
mode := i.Mode()
size := i.Size()
isSymlink := i.Mode()&os.ModeSymlink != 0
if isSymlink {
var err error
if realPath, err = filepath.EvalSymlinks(filepath.Join(path, i.Name())); err == nil {
var ri os.FileInfo
if ri, err = os.Stat(realPath); err == nil {
// Write in the size of the target and file mode
mode = ri.Mode()
size = ri.Size()
}
}
}
// Unit test fixed file modes
if unitTestModes {
mode = mode & os.ModeDir
if mode.IsDir() {
mode = mode | 0777
size = 4096
} else {
mode = mode | 0666
}
}
return &FileInfo{i.Name(), size, mode, i.ModTime(), "",
isSymlink, realPath}
}
/*
WrapFileInfos wraps a list of os.FileInfo objects into a list of
serializable FileInfo objects. This function will modify the given
list.
*/
func WrapFileInfos(path string, is []os.FileInfo) []os.FileInfo {
for i, info := range is {
is[i] = WrapFileInfo(path, info)
}
return is
}
/*
Name returns the base name.
*/
func (rfi *FileInfo) Name() string {
return rfi.FiName
}
/*
Size returns the length in bytes.
*/
func (rfi *FileInfo) Size() int64 {
return rfi.FiSize
}
/*
Mode returns the file mode bits.
*/
func (rfi *FileInfo) Mode() os.FileMode {
return rfi.FiMode
}
/*
ModTime returns the modification time.
*/
func (rfi *FileInfo) ModTime() time.Time {
return rfi.FiModTime
}
/*
Checksum returns the checksum of this file. May be an empty string if it was
not calculated.
*/
func (rfi *FileInfo) Checksum() string {
return rfi.FiChecksum
}
/*
IsDir returns if this is a directory.
*/
func (rfi *FileInfo) IsDir() bool {
return rfi.FiMode.IsDir()
}
/*
Sys should return the underlying data source but will always return nil
for FileInfo nodes.
*/
func (rfi *FileInfo) Sys() interface{} {
return nil
}
func (rfi *FileInfo) String() string {
sum := rfi.Checksum()
if sum != "" {
return fmt.Sprintf("%v %s [%v] %v (%v) - %v", rfi.Name(), sum, rfi.Size(),
rfi.Mode(), rfi.ModTime(), rfi.Sys())
}
return fmt.Sprintf("%v [%v] %v (%v) - %v", rfi.Name(), rfi.Size(),
rfi.Mode(), rfi.ModTime(), rfi.Sys())
}
/*
* Rufs - Remote Union File System
*
* Copyright 2017 Matthias Ladkau. All rights reserved.
*
* This Source Code Form is subject to the terms of the MIT
* License, If a copy of the MIT License was not distributed with this
* file, You can obtain one at https://opensource.org/licenses/MIT.
*/
/*
Package rumble contains Rumble functions which interface with Rufs.
*/
package rumble
import (
"fmt"
"os"
"regexp"
"devt.de/krotik/common/defs/rumble"
"devt.de/krotik/common/stringutil"
"devt.de/krotik/rufs/api"
)
// Function: dir
// =============
/*
DirFunc queries a directory in a tree.
*/
type DirFunc struct {
}
/*
Name returns the name of the function.
*/
func (f *DirFunc) Name() string {
return "fs.dir"
}
/*
Validate is called for parameter validation and to reset the function state.
*/
func (f *DirFunc) Validate(argsNum int, rt rumble.Runtime) rumble.RuntimeError {
var err rumble.RuntimeError
if argsNum != 3 && argsNum != 4 {
err = rt.NewRuntimeError(rumble.ErrInvalidConstruct,
"Function dir requires 3 or 4 parameters: tree, a path, a glob expression and optionally a recursive flag")
}
return err
}
/*
Execute executes the rumble function.
*/
func (f *DirFunc) Execute(argsVal []interface{}, vars rumble.Variables,
rt rumble.Runtime) (interface{}, rumble.RuntimeError) {
var res interface{}
var paths []string
var fiList [][]os.FileInfo
treeName := fmt.Sprint(argsVal[0])
path := fmt.Sprint(argsVal[1])
pattern := fmt.Sprint(argsVal[2])
recursive := argsVal[3] == true
conv := func(re *regexp.Regexp, fis []os.FileInfo) []interface{} {
r := make([]interface{}, 0, len(fis))
for _, fi := range fis {
if !fi.IsDir() && !re.MatchString(fi.Name()) {
continue
}
r = append(r, map[interface{}]interface{}{
"name": fi.Name(),
"mode": fmt.Sprint(fi.Mode()),
"modtime": fmt.Sprint(fi.ModTime()),
"isdir": fi.IsDir(),
"size": fi.Size(),
})
}
return r
}
tree, ok, err := api.GetTree(treeName)
if !ok {
if err == nil {
err = fmt.Errorf("Unknown tree: %v", treeName)
}
}
if err == nil {
var globPattern string
// Create regex for files
if globPattern, err = stringutil.GlobToRegex(pattern); err == nil {
var re *regexp.Regexp
if re, err = regexp.Compile(globPattern); err == nil {
// Query the file system
paths, fiList, err = tree.Dir(path, "", recursive, false)
pathData := make([]interface{}, 0, len(paths))
fisData := make([]interface{}, 0, len(paths))
// Convert the result into a Rumble data structure
for i := range paths {
fis := conv(re, fiList[i])
// If we have a regex then only include directories which have files
pathData = append(pathData, paths[i])
fisData = append(fisData, fis)
}
res = []interface{}{pathData, fisData}
}
}
}
if err != nil {
// Wrap error message in RuntimeError
err = rt.NewRuntimeError(rumble.ErrInvalidState,
fmt.Sprintf("Cannot list files: %v", err.Error()))
}
return res, err
}
/*
* Rufs - Remote Union File System
*
* Copyright 2017 Matthias Ladkau. All rights reserved.
*
* This Source Code Form is subject to the terms of the MIT
* License, If a copy of the MIT License was not distributed with this
* file, You can obtain one at https://opensource.org/licenses/MIT.
*/
/*
Package node contains the network communication code for Rufs via RPC calls.
*/
package node
import (
"crypto/tls"
"encoding/gob"
"fmt"
"io"
"net"
"net/rpc"
"os"
"sort"
"strings"
"sync"
"time"
)
func init() {
// Make sure we can use the relevant types in a gob operation
gob.Register(&RufsNodeToken{})
gob.Register(map[string]string{})
}
/*
DialTimeout is the dial timeout for RPC connections
*/
var DialTimeout = 10 * time.Second
/*
RufsNodeToken is used to authenticate a node in the network to other nodes
*/
type RufsNodeToken struct {
NodeName string
NodeAuth string
}
/*
Client is the client for the RPC API of a node.
*/
type Client struct {
token *RufsNodeToken // Token to be send to other nodes for authentication
rpc string // This client's rpc network interface (may be empty in case of pure clients)
peers map[string]string // Map of node names to their rpc network interface
conns map[string]*rpc.Client // Map of node names to network connections
fingerprints map[string]string // Map of expected server certificate fingerprints
cert *tls.Certificate // Client certificate
maplock *sync.RWMutex // Lock for maps
redial bool // Flag if this client is attempting a redial
}
/*
SSLFingerprint returns the SSL fingerprint of the client.
*/
func (c *Client) SSLFingerprint() string {
var ret string
if c.cert != nil && c.cert.Certificate[0] != nil {
ret = fingerprint(c.cert.Certificate[0])
}
return ret
}
/*
Shutdown closes all stored connections.
*/
func (c *Client) Shutdown() {
c.maplock.Lock()
defer c.maplock.Unlock()
for _, c := range c.conns {
c.Close()
}
c.conns = make(map[string]*rpc.Client)
}
/*
RegisterPeer registers a new peer to communicate with. An empty fingerprint
means that the client will accept any certificate from the server.
*/
func (c *Client) RegisterPeer(node string, rpc string, fingerprint string) error {
if _, ok := c.peers[node]; ok {
return fmt.Errorf("Peer already registered: %v", node)
} else if rpc == "" {
return fmt.Errorf("RPC interface must not be empty")
}
c.maplock.Lock()
c.peers[node] = rpc
delete(c.conns, node)
c.fingerprints[node] = fingerprint
c.maplock.Unlock()
return nil
}
/*
Peers returns all registered peers and their expected fingerprints.
*/
func (c *Client) Peers() ([]string, []string) {
ret := make([]string, 0, len(c.peers))
fps := make([]string, len(c.peers))
c.maplock.Lock()
defer c.maplock.Unlock()
for k := range c.peers {
ret = append(ret, k)
}
sort.Strings(ret)
for i, node := range ret {
fps[i] = c.fingerprints[node]
}
return ret, fps
}
/*
RemovePeer removes a registered peer.
*/
func (c *Client) RemovePeer(node string) {
c.maplock.Lock()
delete(c.peers, node)
delete(c.conns, node)
delete(c.fingerprints, node)
c.maplock.Unlock()
}
/*
SendPing sends a ping to a node and returns the result. Second argument is
optional if the target member is not a known peer. Should be an empty string
in all other cases. Returns the answer, the fingerprint of the presented server
certificate and any errors.
*/
func (c *Client) SendPing(node string, rpc string) ([]string, string, error) {
var ret []string
var fp string
if _, ok := c.peers[node]; !ok && rpc != "" {
// Add member temporary if it was not registered
c.maplock.Lock()
c.peers[node] = rpc
c.maplock.Unlock()
defer func() {
c.maplock.Lock()
delete(c.peers, node)
delete(c.conns, node)
delete(c.fingerprints, node)
c.maplock.Unlock()
}()
}
res, err := c.SendRequest(node, RPCPing, nil)
if res != nil && err == nil {
ret = res.([]string)
c.maplock.Lock()
fp = c.fingerprints[node]
c.maplock.Unlock()
}
return ret, fp, err
}
/*
SendData sends a portion of data and some control information to a node and
returns the result.
*/
func (c *Client) SendData(node string, ctrl map[string]string, data []byte) ([]byte, error) {
if _, ok := c.peers[node]; !ok {
return nil, fmt.Errorf("Unknown peer: %v", node)
}
res, err := c.SendRequest(node, RPCData, map[RequestArgument]interface{}{
RequestCTRL: ctrl,
RequestDATA: data,
})
if res != nil {
return res.([]byte), err
}
return nil, err
}
/*
SendRequest sends a request to another node.
*/
func (c *Client) SendRequest(node string, remoteCall RPCFunction,
args map[RequestArgument]interface{}) (interface{}, error) {
var err error
// Function to categorize errors
handleError := func(err error) error {
if _, ok := err.(net.Error); ok {
return &Error{ErrNodeComm, err.Error(), false}
}
// Wrap remote errors in a proper error object
if err != nil && !strings.HasPrefix(err.Error(), "RufsError: ") {
// Check if the error is known to report that a file or directory
// does not exist.
err = &Error{ErrRemoteAction, err.Error(), err.Error() == os.ErrNotExist.Error()}
}
return err
}
c.maplock.Lock()
laddr, ok := c.peers[node]
c.maplock.Unlock()
if ok {
// Get network connection to the node
c.maplock.Lock()
conn, ok := c.conns[node]
c.maplock.Unlock()
if !ok {
// Create a new connection if necessary
nconn, err := net.DialTimeout("tcp", laddr, DialTimeout)
if err != nil {
LogDebug(c.token.NodeName, ": ",
fmt.Sprintf("- %v.%v (laddr=%v err=%v)",
node, remoteCall, laddr, err))
return nil, handleError(err)
}
if c.cert != nil && c.cert.Certificate[0] != nil {
// Wrap the conn in a TLS client
config := tls.Config{
Certificates: []tls.Certificate{*c.cert},
InsecureSkipVerify: true,
}
tlsconn := tls.Client(nconn, &config)
// Do the handshake and look at the server certificate
tlsconn.Handshake()
rfp := fingerprint(tlsconn.ConnectionState().PeerCertificates[0].Raw)
c.maplock.Lock()
expected, _ := c.fingerprints[node]
c.maplock.Unlock()
if expected == "" {
// Accept the certificate and store it
c.maplock.Lock()
c.fingerprints[node] = rfp
c.maplock.Unlock()
} else if expected != rfp {
// Fingerprint was NOT verified
LogDebug(c.token.NodeName, ": ",
fmt.Sprintf("Not trusting %v (laddr=%v) presented fingerprint: %v expected fingerprint: %v", node, laddr, rfp, expected))
return nil, &Error{ErrUntrustedTarget, node, false}
}
LogDebug(c.token.NodeName, ": ",
fmt.Sprintf("%v (laddr=%v) has SSL fingerprint %v ", node, laddr, rfp))
nconn = tlsconn
}
conn = rpc.NewClient(nconn)
// Store the connection so it can be reused
c.maplock.Lock()
c.conns[node] = conn
c.maplock.Unlock()
}
// Assemble the request
request := map[RequestArgument]interface{}{
RequestTARGET: node,
RequestTOKEN: c.token,
}
if args != nil {
for k, v := range args {
request[k] = v
}
}
var response interface{}
LogDebug(c.token.NodeName, ": ",
fmt.Sprintf("> %v.%v (laddr=%v)", node, remoteCall, laddr))
err = conn.Call("RufsServer."+string(remoteCall), request, &response)
if !c.redial && (err == rpc.ErrShutdown || err == io.EOF || err == io.ErrUnexpectedEOF) {
// Delete the closed connection and retry the request
c.maplock.Lock()
delete(c.conns, node)
c.redial = true // Set the redial flag to avoid a forever loop
c.maplock.Unlock()
return c.SendRequest(node, remoteCall, args)
}
// Reset redial flag
c.maplock.Lock()
c.redial = false
c.maplock.Unlock()
LogDebug(c.token.NodeName, ": ",
fmt.Sprintf("< %v.%v (err=%v)", node, remoteCall, err))
return response, handleError(err)
}
return nil, &Error{ErrUnknownTarget, node, false}
}
/*
* Rufs - Remote Union File System
*
* Copyright 2017 Matthias Ladkau. All rights reserved.
*
* This Source Code Form is subject to the terms of the MIT
* License, If a copy of the MIT License was not distributed with this
* file, You can obtain one at https://opensource.org/licenses/MIT.
*/
package node
import (
"errors"
"fmt"
"log"
)
// Logging
// =======
/*
Logger is a function which processes log messages
*/
type Logger func(v ...interface{})
/*
LogInfo is called if an info message is logged
*/
var LogInfo = Logger(log.Print)
/*
LogDebug is called if a debug message is logged
(by default disabled)
*/
var LogDebug = Logger(LogNull)
/*
LogNull is a discarding logger to be used for disabling loggers
*/
var LogNull = func(v ...interface{}) {
}
// Errors
// ======
/*
Error is a network related error
*/
type Error struct {
Type error // Error type (to be used for equal checks)
Detail string // Details of this error
IsNotExist bool // Error is file or directory does not exist
}
/*
Error returns a human-readable string representation of this error.
*/
func (ge *Error) Error() string {
if ge.Detail != "" {
return fmt.Sprintf("RufsError: %v (%v)", ge.Type, ge.Detail)
}
return fmt.Sprintf("RufsError: %v", ge.Type)
}
/*
Network related error types
*/
var (
ErrNodeComm = errors.New("Network error")
ErrRemoteAction = errors.New("Remote error")
ErrUnknownTarget = errors.New("Unknown target node")
ErrUntrustedTarget = errors.New("Unexpected SSL certificate from target node")
ErrInvalidToken = errors.New("Invalid node token")
)
/*
* Rufs - Remote Union File System
*
* Copyright 2017 Matthias Ladkau. All rights reserved.
*
* This Source Code Form is subject to the terms of the MIT
* License, If a copy of the MIT License was not distributed with this
* file, You can obtain one at https://opensource.org/licenses/MIT.
*/
package node
import (
"crypto/sha512"
"crypto/tls"
"fmt"
"net"
"net/rpc"
"sync"
)
/*
RequestHandler is a function to handle incoming requests. A request has a
control object which contains information on what the data is and how it
should be used and the data itself. The request handler should return
the result or an error.
*/
type RequestHandler func(ctrl map[string]string, data []byte) ([]byte, error)
/*
RufsNode is the management object for a node in the Rufs network.
A RufsNode registers itself to the rpc server which is the global
server object. Each node needs to have a unique name. Communication between nodes
is secured by using a secret string which is never exchanged over the network
and a hash generated token which identifies a member.
Each RufsNode object contains a Client object which can be used to communicate
with other nodes. This object should be used by pure clients - code which should
communicate with the cluster without running an actual member.
*/
type RufsNode struct {
name string // Name of the node
secret string // Network wide secret
Client *Client // RPC client object
listener net.Listener // RPC server listener
wg sync.WaitGroup // RPC server Waitgroup for listener shutdown
DataHandler RequestHandler // Handler function for data requests
cert *tls.Certificate // Node certificate
}
/*
NewNode create a new RufsNode object.
*/
func NewNode(rpcInterface string, name string, secret string, clientCert *tls.Certificate,
dataHandler RequestHandler) *RufsNode {
// Generate node token
token := &RufsNodeToken{name, fmt.Sprintf("%X", sha512.Sum512_224([]byte(name+secret)))}
rn := &RufsNode{name, secret, &Client{token, rpcInterface, make(map[string]string),
make(map[string]*rpc.Client), make(map[string]string), clientCert, &sync.RWMutex{}, false},
nil, sync.WaitGroup{}, dataHandler, clientCert}
return rn
}
/*
NewClient create a new Client object.
*/
func NewClient(secret string, clientCert *tls.Certificate) *Client {
return NewNode("", "", secret, clientCert, nil).Client
}
// General node API
// ================
/*
Name returns the name of the node.
*/
func (rn *RufsNode) Name() string {
return rn.name
}
/*
SSLFingerprint returns the SSL fingerprint of the node.
*/
func (rn *RufsNode) SSLFingerprint() string {
var ret string
if rn.cert != nil && rn.cert.Certificate[0] != nil {
ret = fingerprint(rn.cert.Certificate[0])
}
return ret
}
/*
LogInfo logs a node related message at info level.
*/
func (rn *RufsNode) LogInfo(v ...interface{}) {
LogInfo(rn.name, ": ", fmt.Sprint(v...))
}
/*
Start starts process for this node.
*/
func (rn *RufsNode) Start(serverCert *tls.Certificate) error {
if _, ok := rufsServer.nodes[rn.name]; ok {
return fmt.Errorf("Cannot start node %s twice", rn.name)
}
rn.LogInfo("Starting node ", rn.name, " rpc server on: ", rn.Client.rpc)
l, err := net.Listen("tcp", rn.Client.rpc)
if err != nil {
return err
}
if serverCert != nil && serverCert.Certificate[0] != nil {
rn.cert = serverCert
rn.LogInfo("SSL fingerprint: ", rn.SSLFingerprint())
// Wrap the listener in a TLS listener
config := tls.Config{Certificates: []tls.Certificate{*serverCert}}
l = tls.NewListener(l, &config)
}
// Kick of the rpc listener
go func() {
rpc.Accept(l)
rn.wg.Done()
rn.LogInfo("Connection closed: ", rn.Client.rpc)
}()
rn.listener = l
// Register this node in the global server map
rufsServer.nodes[rn.name] = rn
return nil
}
/*
Shutdown shuts the member manager rpc server for this cluster member down.
*/
func (rn *RufsNode) Shutdown() error {
var err error
// Close socket
if rn.listener != nil {
rn.LogInfo("Shutdown rpc server on: ", rn.Client.rpc)
rn.wg.Add(1)
err = rn.listener.Close()
rn.Client.Shutdown()
rn.listener = nil
rn.wg.Wait()
delete(rufsServer.nodes, rn.name)
} else {
LogDebug("Node ", rn.name, " already shut down")
}
return err
}
/*
* Rufs - Remote Union File System
*
* Copyright 2017 Matthias Ladkau. All rights reserved.
*
* This Source Code Form is subject to the terms of the MIT
* License, If a copy of the MIT License was not distributed with this
* file, You can obtain one at https://opensource.org/licenses/MIT.
*/
package node
import (
"bytes"
"crypto/sha256"
"crypto/sha512"
"fmt"
"net/rpc"
"devt.de/krotik/common/errorutil"
)
func init() {
// Create singleton Server instance.
rufsServer = &RufsServer{make(map[string]*RufsNode)}
// Register the cluster API as RPC server
errorutil.AssertOk(rpc.Register(rufsServer))
}
/*
RPCFunction is used to identify the called function in a RPC call
*/
type RPCFunction string
/*
List of all possible RPC functions. The list includes all RPC callable functions
in this file.
*/
const (
// General functions
RPCPing RPCFunction = "Ping"
RPCData RPCFunction = "Data"
)
/*
RequestArgument is used to identify arguments in a RPC call
*/
type RequestArgument int
/*
List of all possible arguments in a RPC request. There are usually no checks which
give back an error if a required argument is missing. The RPC API is an internal
API and might change without backwards compatibility.
*/
const (
// General arguments
RequestTARGET RequestArgument = iota // Required argument which identifies the target node
RequestTOKEN // Client token which is used for authorization checks
RequestCTRL // Control object (i.e. what to do with the data)
RequestDATA // Data object
)
/*
rufsServer is the Server instance which serves rpc calls
*/
var rufsServer *RufsServer
/*
RufsServer is the RPC exposed Rufs API of a machine. Server is a singleton and will
route incoming (authenticated) requests to registered RufsNodes. The calling
node is referred to as source node and the called node is referred to as
target node.
*/
type RufsServer struct {
nodes map[string]*RufsNode // Map of local RufsNodes
}
// General functions
// =================
/*
Ping answers with a Pong if the given client token was verified and the local
node exists.
*/
func (s *RufsServer) Ping(request map[RequestArgument]interface{},
response *interface{}) error {
// Verify the given token and retrieve the target member
if _, err := s.checkToken(request); err != nil {
return err
}
// Send a simple response
res := []string{"Pong"}
*response = res
return nil
}
/*
Data handles data requests.
*/
func (s *RufsServer) Data(request map[RequestArgument]interface{},
response *interface{}) error {
// Verify the given token and retrieve the target member
node, err := s.checkToken(request)
if err != nil || node.DataHandler == nil {
return err
}
// Forward to the registered data handler
res, err := node.DataHandler(request[RequestCTRL].(map[string]string),
request[RequestDATA].([]byte))
if err == nil {
*response = res
}
return err
}
// Helper functions
// ================
/*
checkToken checks the member token in a given request.
*/
func (s *RufsServer) checkToken(request map[RequestArgument]interface{}) (*RufsNode, error) {
err := ErrUnknownTarget
// Get the target member
target := request[RequestTARGET].(string)
token := request[RequestTOKEN].(*RufsNodeToken)
if node, ok := s.nodes[target]; ok {
err = ErrInvalidToken
// Generate expected auth from given requesting node name in token and secret of target
expectedAuth := fmt.Sprintf("%X", sha512.Sum512_224([]byte(token.NodeName+node.secret)))
if token.NodeAuth == expectedAuth {
return node, nil
}
}
return nil, err
}
/*
fingerprint converts a given set of bytes to a fingerprint.
*/
func fingerprint(b []byte) string {
var buf bytes.Buffer
hs := fmt.Sprintf("%x", sha256.Sum256(b))
for i, c := range hs {
buf.WriteByte(byte(c))
if (i+1)%2 == 0 && i != len(hs)-1 {
buf.WriteByte(byte(':'))
}
}
return buf.String()
}
/*
* Rufs - Remote Union File System
*
* Copyright 2017 Matthias Ladkau. All rights reserved.
*
* This Source Code Form is subject to the terms of the MIT
* License, If a copy of the MIT License was not distributed with this
* file, You can obtain one at https://opensource.org/licenses/MIT.
*/
package term
import (
"fmt"
"os"
"devt.de/krotik/common/stringutil"
"devt.de/krotik/rufs"
)
/*
cmdCd show or change the current directory.
*/
func cmdCd(tt *TreeTerm, arg ...string) (string, error) {
if len(arg) > 0 {
tt.cd = tt.parsePathParam(arg[0])
}
return fmt.Sprint(tt.cd, "\n"), nil
}
/*
cmdDir shows a directory listing.
*/
func cmdDir(tt *TreeTerm, arg ...string) (string, error) {
return cmdDirListing(tt, false, false, arg...)
}
/*
cmdChecksum shows a directory listing and the checksums.
*/
func cmdChecksum(tt *TreeTerm, arg ...string) (string, error) {
return cmdDirListing(tt, false, true, arg...)
}
/*
cmdTree shows the listing of a directory and its subdirectorie
*/
func cmdTree(tt *TreeTerm, arg ...string) (string, error) {
return cmdDirListing(tt, true, false, arg...)
}
/*
cmdDirListing shows a directory listing and optional also its subdirectories.
*/
func cmdDirListing(tt *TreeTerm, recursive bool, checksum bool, arg ...string) (string, error) {
var dirs []string
var fis [][]os.FileInfo
var err error
var res, rex string
dir := tt.cd
if len(arg) > 0 {
dir = tt.parsePathParam(arg[0])
if len(arg) > 1 {
rex, err = stringutil.GlobToRegex(arg[1])
}
}
if err == nil {
if dirs, fis, err = tt.tree.Dir(dir, rex, recursive, checksum); err == nil {
res = rufs.DirResultToString(dirs, fis)
}
}
return res, err
}
/*
* Rufs - Remote Union File System
*
* Copyright 2017 Matthias Ladkau. All rights reserved.
*
* This Source Code Form is subject to the terms of the MIT
* License, If a copy of the MIT License was not distributed with this
* file, You can obtain one at https://opensource.org/licenses/MIT.
*/
package term
import (
"fmt"
"os"
"path"
"path/filepath"
"devt.de/krotik/common/bitutil"
"devt.de/krotik/rufs"
)
/*
cmdCat reads and prints the contents of a file.
*/
func cmdCat(tt *TreeTerm, arg ...string) (string, error) {
err := fmt.Errorf("cat requires a file path")
if len(arg) > 0 {
err = tt.tree.ReadFileToBuffer(tt.parsePathParam(arg[0]), tt.out)
}
return "", err
}
/*
cmdGet Retrieve a file and store it locally (in the current directory).
*/
func cmdGet(tt *TreeTerm, arg ...string) (string, error) {
var res string
lenArg := len(arg)
err := fmt.Errorf("get requires at least a source file path")
if lenArg > 0 {
var f *os.File
src := tt.parsePathParam(arg[0])
dst := src
if lenArg > 1 {
dst = tt.parsePathParam(arg[1])
}
// Make sure we only write files to the local folder
_, dst = filepath.Split(dst)
if f, err = os.OpenFile(dst, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660); err == nil {
defer f.Close()
if err = tt.tree.ReadFileToBuffer(src, f); err == nil {
res = fmt.Sprintf("Written file %s", dst)
}
}
}
return res, err
}
/*
cmdPut Read a local file and store it.
*/
func cmdPut(tt *TreeTerm, arg ...string) (string, error) {
var res string
lenArg := len(arg)
err := fmt.Errorf("put requires a source and destination file path")
if lenArg > 0 {
var f *os.File
src := arg[0]
dst := tt.parsePathParam(arg[1])
if f, err = os.Open(src); err == nil {
defer f.Close()
if err = tt.tree.WriteFileFromBuffer(dst, f); err == nil {
res = fmt.Sprintf("Written file %s", dst)
}
}
}
return res, err
}
/*
cmdRm Delete a file or directory.
*/
func cmdRm(tt *TreeTerm, arg ...string) (string, error) {
var res string
lenArg := len(arg)
err := fmt.Errorf("rm requires a file path")
if lenArg > 0 {
p := tt.parsePathParam(arg[0])
dir, file := path.Split(p)
if file == "" {
// If a path is give just chop off the last slash and try again
dir, file = path.Split(dir[:len(dir)-1])
}
_, err = tt.tree.ItemOp(dir, map[string]string{
rufs.ItemOpAction: rufs.ItemOpActDelete,
rufs.ItemOpName: file,
})
}
return res, err
}
/*
cmdRen Rename a file or directory.
*/
func cmdRen(tt *TreeTerm, arg ...string) (string, error) {
var res string
lenArg := len(arg)
err := fmt.Errorf("ren requires a filename and a new filename")
if lenArg > 1 {
p := tt.parsePathParam(arg[0])
p2 := tt.parsePathParam(arg[1])
dir1, file1 := path.Split(p)
dir2, file2 := path.Split(p2)
if file2 == "" || dir2 != "/" {
err = fmt.Errorf("new filename must not have a path")
} else {
if file1 == "" {
// If a path is give just chop off the last slash and try again
dir1, file1 = path.Split(dir1[:len(dir1)-1])
}
_, err = tt.tree.ItemOp(dir1, map[string]string{
rufs.ItemOpAction: rufs.ItemOpActRename,
rufs.ItemOpName: file1,
rufs.ItemOpNewName: file2,
})
}
}
return res, err
}
/*
cmdMkDir Create a new direectory.
*/
func cmdMkDir(tt *TreeTerm, arg ...string) (string, error) {
var res string
lenArg := len(arg)
err := fmt.Errorf("mkdir requires a directory path")
if lenArg > 0 {
p := tt.parsePathParam(arg[0])
dir, newdir := path.Split(p)
if newdir == "" {
// If a path is given just chop off the last slash and try again
dir, newdir = path.Split(dir[:len(dir)-1])
}
_, err = tt.tree.ItemOp(dir, map[string]string{
rufs.ItemOpAction: rufs.ItemOpActMkDir,
rufs.ItemOpName: newdir,
})
}
return res, err
}
/*
cmdCp Copy a file.
*/
func cmdCp(tt *TreeTerm, arg ...string) (string, error) {
var res string
lenArg := len(arg)
err := fmt.Errorf("cp requires a source file or directory and a destination directory")
if lenArg > 1 {
src := tt.parsePathParam(arg[0])
dst := tt.parsePathParam(arg[1])
updFunc := func(file string, writtenBytes, totalBytes, currentFile, totalFiles int64) {
if writtenBytes > 0 {
tt.WriteStatus(fmt.Sprintf("Copy %v: %v / %v (%v of %v)", file,
bitutil.ByteSizeString(writtenBytes, false),
bitutil.ByteSizeString(totalBytes, false),
currentFile, totalFiles))
} else {
tt.ClearStatus()
}
}
if err = tt.tree.Copy([]string{src}, dst, updFunc); err == nil {
res = "Done"
}
}
return res, err
}
/*
* Rufs - Remote Union File System
*
* Copyright 2017 Matthias Ladkau. All rights reserved.
*
* This Source Code Form is subject to the terms of the MIT
* License, If a copy of the MIT License was not distributed with this
* file, You can obtain one at https://opensource.org/licenses/MIT.
*/
package term
import (
"bytes"
"fmt"
"sort"
"unicode/utf8"
"devt.de/krotik/common/stringutil"
)
/*
cmdHelp executes the help command.
*/
func cmdHelp(tt *TreeTerm, arg ...string) (string, error) {
var res bytes.Buffer
if len(arg) == 0 {
var maxlen = 0
cmds := make([]string, 0, len(helpMap))
res.WriteString("Available commands:\n")
res.WriteString("----\n")
for c := range helpMap {
if cc := utf8.RuneCountInString(c); cc > maxlen {
maxlen = cc
}
cmds = append(cmds, c)
}
sort.Strings(cmds)
for _, c := range cmds {
cc := utf8.RuneCountInString(c)
spacer := stringutil.GenerateRollingString(" ", maxlen-cc)
res.WriteString(fmt.Sprintf("%v%v : %v\n", c, spacer, helpMap[c]))
}
}
return res.String(), nil
}
/*
* Rufs - Remote Union File System
*
* Copyright 2017 Matthias Ladkau. All rights reserved.
*
* This Source Code Form is subject to the terms of the MIT
* License, If a copy of the MIT License was not distributed with this
* file, You can obtain one at https://opensource.org/licenses/MIT.
*/
package term
import (
"bytes"
"fmt"
)
/*
cmdReset removes all present mount points or branches.
*/
func cmdReset(tt *TreeTerm, arg ...string) (string, error) {
if len(arg) > 0 {
if arg[0] == "mounts" {
tt.tree.Reset(false)
return "Resetting all mounts\n", nil
} else if arg[0] == "branches" {
tt.tree.Reset(true)
return "Resetting all branches and mounts\n", nil
}
}
return "", fmt.Errorf("Can either reset all [mounts] or all [branches] which includes all mount points")
}
/*
cmdBranch lists all known branches or adds a new branch to the tree.
*/
func cmdBranch(tt *TreeTerm, arg ...string) (string, error) {
var err error
var res bytes.Buffer
writeKnownBranches := func() {
braches, fps := tt.tree.ActiveBranches()
for i, b := range braches {
res.WriteString(fmt.Sprintf("%v [%v]\n", b, fps[i]))
}
}
if len(arg) == 0 {
writeKnownBranches()
} else if len(arg) > 1 {
var fp = ""
branchName := arg[0]
branchRPC := arg[1]
if len(arg) > 2 {
fp = arg[2]
}
err = tt.tree.AddBranch(branchName, branchRPC, fp)
writeKnownBranches()
} else {
err = fmt.Errorf("branch requires either no or at least 2 parameters")
}
return res.String(), err
}
/*
cmdMount lists all mount points or adds a new mount point to the tree.
*/
func cmdMount(tt *TreeTerm, arg ...string) (string, error) {
var err error
var res bytes.Buffer
if len(arg) == 0 {
res.WriteString(tt.tree.String())
} else if len(arg) > 1 {
dir := arg[0]
branchName := arg[1]
writable := !(len(arg) > 2 && arg[2] == "ro") // Writeable unless stated otherwise
if err = tt.tree.AddMapping(dir, branchName, writable); err == nil {
res.WriteString(tt.tree.String())
}
} else {
err = fmt.Errorf("mount requires either 2 or no parameters")
}
return res.String(), err
}
/*
* Rufs - Remote Union File System
*
* Copyright 2017 Matthias Ladkau. All rights reserved.
*
* This Source Code Form is subject to the terms of the MIT
* License, If a copy of the MIT License was not distributed with this
* file, You can obtain one at https://opensource.org/licenses/MIT.
*/
package term
import (
"fmt"
"devt.de/krotik/common/bitutil"
)
/*
cmdSync Make sure dst has the same files and directories as src.
*/
func cmdSync(tt *TreeTerm, arg ...string) (string, error) {
var res string
lenArg := len(arg)
err := fmt.Errorf("sync requires a source and a destination directory")
if lenArg > 1 {
src := tt.parsePathParam(arg[0])
dst := tt.parsePathParam(arg[1])
updFunc := func(op, srcFile, dstFile string, writtenBytes, totalBytes, currentFile, totalFiles int64) {
if writtenBytes > 0 {
tt.WriteStatus(fmt.Sprintf("%v (%v/%v) writing: %v -> %v %v / %v", op,
currentFile, totalFiles, srcFile, dstFile,
bitutil.ByteSizeString(writtenBytes, false),
bitutil.ByteSizeString(totalBytes, false)))
} else {
tt.ClearStatus()
fmt.Fprint(tt.out, fmt.Sprintln(fmt.Sprintf("%v (%v/%v) %v -> %v", op,
currentFile, totalFiles, srcFile, dstFile)))
}
}
if err = tt.tree.Sync(src, dst, true, updFunc); err == nil {
res = "Done"
}
}
return res, err
}
/*
* Rufs - Remote Union File System
*
* Copyright 2017 Matthias Ladkau. All rights reserved.
*
* This Source Code Form is subject to the terms of the MIT
* License, If a copy of the MIT License was not distributed with this
* file, You can obtain one at https://opensource.org/licenses/MIT.
*/
package term
import (
"fmt"
"io"
"path"
"sort"
"strings"
"unicode/utf8"
"devt.de/krotik/common/stringutil"
"devt.de/krotik/rufs"
)
/*
TreeTerm models a command processor for Rufs trees.
*/
type TreeTerm struct {
tree *rufs.Tree // Tree which we operate on
cd string // Current directory
out io.Writer // Output writer
lastStatus string // Last status line
}
/*
NewTreeTerm returns a new command processor for Rufs trees.
*/
func NewTreeTerm(t *rufs.Tree, out io.Writer) *TreeTerm {
return &TreeTerm{t, "/", out, ""}
}
/*
WriteStatus writes a status line to the output writer.
*/
func (tt *TreeTerm) WriteStatus(line string) {
fmt.Fprint(tt.out, "\r")
fmt.Fprint(tt.out, line)
ll := len(tt.lastStatus)
lc := len(line)
if ll > lc {
fmt.Fprint(tt.out, stringutil.GenerateRollingString(" ", ll-lc))
}
tt.lastStatus = line
}
/*
ClearStatus removes the last status line and returns the cursor to the initial position.
*/
func (tt *TreeTerm) ClearStatus() {
if tt.lastStatus != "" {
toClear := utf8.RuneCountInString(tt.lastStatus)
fmt.Fprint(tt.out, "\r")
fmt.Fprint(tt.out, stringutil.GenerateRollingString(" ", toClear))
fmt.Fprint(tt.out, "\r")
}
}
/*
CurrentDir returns the current directory of this TreeTerm.
*/
func (tt *TreeTerm) CurrentDir() string {
return tt.cd
}
/*
AddCmd adds a new command to the terminal
*/
func (tt *TreeTerm) AddCmd(cmd, helpusage, help string,
cmdFunc func(*TreeTerm, ...string) (string, error)) {
cmdMap[cmd] = cmdFunc
helpMap[helpusage] = help
}
/*
Cmds returns a list of available terminal commands.
*/
func (tt *TreeTerm) Cmds() []string {
var cmds []string
for k := range cmdMap {
cmds = append(cmds, k)
}
sort.Strings(cmds)
return cmds
}
/*
Run executes a given command line. And return its output as a string. File
output and other streams to the console are written to the output writer.
*/
func (tt *TreeTerm) Run(line string) (string, error) {
var err error
var res string
var arg []string
// Parse the input
c := strings.Split(line, " ")
cmd := c[0]
if len(c) > 1 {
arg = c[1:]
}
// Execute the given command
if f, ok := cmdMap[cmd]; ok {
res, err = f(tt, arg...)
} else {
err = fmt.Errorf("Unknown command: %s", cmd)
}
return res, err
}
/*
cmdPing pings a remote branch.
*/
func cmdPing(tt *TreeTerm, arg ...string) (string, error) {
var res string
err := fmt.Errorf("ping requires at least a branch name")
if len(arg) > 0 {
var fp, rpc string
if len(arg) > 1 {
rpc = arg[1]
}
if fp, err = tt.tree.PingBranch(arg[0], rpc); err == nil {
res = fmt.Sprint("Response ok - fingerprint: ", fp, "\n")
}
}
return res, err
}
/*
cmdRefresh refreshes all known branches and connects depending on if the
branches are reachable.
*/
func cmdRefresh(tt *TreeTerm, arg ...string) (string, error) {
tt.tree.Refresh()
return "Done", nil
}
/*
parsePathParam parse a given path parameter and return an absolute path.
*/
func (tt *TreeTerm) parsePathParam(p string) string {
if !strings.HasPrefix(p, "/") {
p = path.Join(tt.cd, p) // Take care of relative paths
}
return p
}
/*
* Rufs - Remote Union File System
*
* Copyright 2017 Matthias Ladkau. All rights reserved.
*
* This Source Code Form is subject to the terms of the MIT
* License, If a copy of the MIT License was not distributed with this
* file, You can obtain one at https://opensource.org/licenses/MIT.
*/
package rufs
import (
"bytes"
"crypto/tls"
"encoding/gob"
"encoding/json"
"fmt"
"io"
"os"
"path"
"regexp"
"sort"
"strings"
"sync"
"time"
"unicode/utf8"
"devt.de/krotik/common/bitutil"
"devt.de/krotik/common/fileutil"
"devt.de/krotik/common/stringutil"
"devt.de/krotik/rufs/config"
"devt.de/krotik/rufs/node"
)
/*
Tree models a Rufs client which combines several branches.
*/
type Tree struct {
client *node.Client // RPC client
treeLock *sync.RWMutex // Lock for maps
root *treeItem // Tree root item
branches []map[string]string // Added working branches
branchesAll []map[string]string // All added branches also not working
mapping []map[string]interface{} // Mappings from working branches
mappingAll []map[string]interface{} // All used mappings
}
/*
NewTree creates a new tree.
*/
func NewTree(cfg map[string]interface{}, cert *tls.Certificate) (*Tree, error) {
var err error
var t *Tree
// Make sure the given config is ok
if err = config.CheckTreeConfig(cfg); err == nil {
// Create RPC client
c := node.NewClient(fileutil.ConfStr(cfg, config.TreeSecret), cert)
// Create the tree
t = &Tree{c, &sync.RWMutex{}, &treeItem{make(map[string]*treeItem),
[]string{}, []bool{}}, []map[string]string{},
[]map[string]string{}, []map[string]interface{}{},
[]map[string]interface{}{}}
}
return t, err
}
/*
Config returns the current tree configuration as a JSON string.
*/
func (t *Tree) Config() string {
t.treeLock.RLock()
defer t.treeLock.RUnlock()
out, _ := json.MarshalIndent(map[string]interface{}{
"branches": t.branches,
"tree": t.mapping,
}, "", " ")
return string(out)
}
/*
SetMapping adds a given tree mapping configuration in a JSON string.
*/
func (t *Tree) SetMapping(config string) error {
var err error
var conf map[string][]map[string]interface{}
// Unmarshal the config
if err = json.Unmarshal([]byte(config), &conf); err == nil {
// Reset the whole tree
t.Reset(true)
if branches, ok := conf["branches"]; ok {
for _, b := range branches {
t.AddBranch(b["branch"].(string), b["rpc"].(string), b["fingerprint"].(string))
}
}
if mounts, ok := conf["tree"]; ok {
for _, m := range mounts {
t.AddMapping(m["path"].(string), m["branch"].(string), m["writeable"].(bool))
}
}
}
return err
}
/*
KnownBranches returns a map of all known branches (active or not reachable).
Caution: This map contains also the map of active branches with their fingerprints
it should only be used for read operations.
*/
func (t *Tree) KnownBranches() map[string]map[string]string {
ret := make(map[string]map[string]string)
t.treeLock.RLock()
t.treeLock.RUnlock()
for _, b := range t.branchesAll {
ret[b["branch"]] = b
}
return ret
}
/*
ActiveBranches returns a list of all known active branches and their fingerprints.
*/
func (t *Tree) ActiveBranches() ([]string, []string) {
return t.client.Peers()
}
/*
NotReachableBranches returns a map of all known branches which couldn't be
reached. The map contains the name and the definition of the branch.
*/
func (t *Tree) NotReachableBranches() map[string]map[string]string {
ret := make(map[string]map[string]string)
t.treeLock.RLock()
t.treeLock.RUnlock()
activeBranches := make(map[string]map[string]string)
for _, b := range t.branches {
activeBranches[b["branch"]] = b
}
for _, b := range t.branchesAll {
name := b["branch"]
if _, ok := activeBranches[name]; !ok {
ret[name] = b
}
}
return ret
}
/*
PingBranch sends a ping to a remote branch and returns its fingerprint or an error.
*/
func (t *Tree) PingBranch(node string, rpc string) (string, error) {
_, fp, err := t.client.SendPing(node, rpc)
return fp, err
}
/*
Reset either resets only all mounts or if the branches flag is specified
also all known branches.
*/
func (t *Tree) Reset(branches bool) {
if branches {
peers, _ := t.client.Peers()
for _, p := range peers {
t.client.RemovePeer(p)
}
t.branches = []map[string]string{}
t.branchesAll = []map[string]string{}
}
t.treeLock.Lock()
defer t.treeLock.Unlock()
t.mapping = []map[string]interface{}{}
t.mappingAll = []map[string]interface{}{}
t.root = &treeItem{make(map[string]*treeItem), []string{}, []bool{}}
}
/*
Refresh refreshes all known branches and mappings. Only reachable branches will
be mapped into the tree.
*/
func (t *Tree) Refresh() {
addBranches := make(map[string]map[string]string)
delBranches := make(map[string]map[string]string)
nrBranches := t.NotReachableBranches()
// Check all known branches and decide if they should be added or removed
t.treeLock.RLock()
for _, data := range t.branchesAll {
branchName := data["branch"]
branchRPC := data["rpc"]
_, knownAsNotWorking := nrBranches[branchName]
// Ping the branch
_, _, err := t.client.SendPing(branchName, branchRPC)
if err == nil && knownAsNotWorking {
// Success branch can now be reached
addBranches[branchName] = data
} else if err != nil && !knownAsNotWorking {
// Failure branch can no longer be reached
delBranches[branchName] = data
}
}
t.treeLock.RUnlock()
// Now lock the tree and add/remove branches
t.treeLock.Lock()
for i, b := range t.branches {
branchName := b["branch"]
if _, ok := delBranches[branchName]; ok {
t.client.RemovePeer(branchName)
t.branches = append(t.branches[:i], t.branches[i+1:]...)
}
}
for _, b := range addBranches {
branchName := b["branch"]
branchRPC := b["rpc"]
branchFingerprint := b["fingerprint"]
t.client.RegisterPeer(branchName, branchRPC, branchFingerprint)
t.branches = append(t.branches, b)
}
// Rebuild all mappings
mappings := t.mappingAll
t.mapping = []map[string]interface{}{}
t.mappingAll = []map[string]interface{}{}
t.root = &treeItem{make(map[string]*treeItem), []string{}, []bool{}}
t.treeLock.Unlock()
for _, m := range mappings {
t.AddMapping(fmt.Sprint(m["path"]), fmt.Sprint(m["branch"]), m["writeable"].(bool))
}
}
/*
AddBranch adds a branch to the tree.
*/
func (t *Tree) AddBranch(branchName string, branchRPC string, branchFingerprint string) error {
branchMap := map[string]string{
"branch": branchName,
"rpc": branchRPC,
"fingerprint": branchFingerprint,
}
t.branchesAll = append(t.branchesAll, branchMap)
// First ping the branch and see if we get a response
_, fp, err := t.client.SendPing(branchName, branchRPC)
// Only add the branch as active if we've seen it
if err == nil {
if branchFingerprint != "" && branchFingerprint != fp {
err = fmt.Errorf("Remote branch has an unexpected fingerprint\nPresented fingerprint: %s\nExpected fingerprint : %s", branchFingerprint, fp)
} else {
t.treeLock.Lock()
defer t.treeLock.Unlock()
if err = t.client.RegisterPeer(branchName, branchRPC, fp); err == nil {
// Once we know and accepted the fingerprint we change it
//
// Remote branches will never change their fingerprint
// during a single network session
branchMap["fingerprint"] = fp
t.branches = append(t.branches, branchMap) // Store the added branch
}
}
}
return err
}
/*
AddMapping adds a mapping from tree path to a branch.
*/
func (t *Tree) AddMapping(dir, branchName string, writable bool) error {
t.treeLock.Lock()
defer t.treeLock.Unlock()
err := node.ErrUnknownTarget
mappingMap := map[string]interface{}{
"path": dir,
"branch": branchName,
"writeable": writable,
}
t.mappingAll = append(t.mappingAll, mappingMap)
peers, _ := t.client.Peers()
for _, p := range peers {
if p == branchName {
// Split the given path and add the mapping
t.root.addMapping(createMappingPath(dir), branchName, writable)
t.mapping = append(t.mapping, mappingMap)
err = nil
}
}
return err
}
/*
String returns a string representation of this tree.
*/
func (t *Tree) String() string {
if t.treeLock != nil {
t.treeLock.RLock()
defer t.treeLock.RUnlock()
}
var buf bytes.Buffer
buf.WriteString("/: ")
if t != nil && t.root != nil {
t.root.String(1, &buf)
}
return buf.String()
}
// Client API
// ==========
/*
Dir returns file listings matching a given pattern of one or more directories.
The contents of the given path is returned. Optionally, also the contents of
all subdirectories can be returned if the recursive flag is set. The return
values is a list of traversed directories and their corresponding contents.
*/
func (t *Tree) Dir(dir string, pattern string, recursive bool, checksums bool) ([]string, [][]os.FileInfo, error) {
var err error
var dirs []string
var fis [][]os.FileInfo
// Compile pattern
re, err := regexp.Compile(pattern)
if err != nil {
return nil, nil, err
}
t.treeLock.RLock()
defer t.treeLock.RUnlock()
// Stip off trailing slashes to normalize the input
if strings.HasSuffix(dir, "/") {
dir = dir[:len(dir)-1]
}
treeVisitor := func(item *treeItem, treePath string, branchPath []string, branches []string, writable []bool) {
for _, b := range branches {
var res []byte
if err == nil {
res, err = t.client.SendData(b, map[string]string{
ParamAction: OpDir,
ParamPath: path.Join(branchPath...),
ParamPattern: fmt.Sprint(pattern),
ParamRecursive: fmt.Sprint(recursive),
ParamChecksums: fmt.Sprint(checksums),
}, nil)
if err == nil {
var dest []interface{}
// Unpack the result
if err = gob.NewDecoder(bytes.NewBuffer(res)).Decode(&dest); err == nil {
bdirs := dest[0].([]string)
bfis := dest[1].([][]os.FileInfo)
// Construct the actual tree path for the returned directories
for i, d := range bdirs {
bdirs[i] = path.Join(treePath, d)
// Merge these results into the overall results
found := false
for j, dir := range dirs {
// Check if a directory from the result is already
// in the overall result
if dir == bdirs[i] {
found = true
// Create a map of existing names to avoid duplicates
existing := make(map[string]bool)
for _, fi := range fis[j] {
existing[fi.Name()] = true
}
// Only add new files to the overall result
for _, fi := range bfis[i] {
if _, ok := existing[fi.Name()]; !ok {
fis[j] = append(fis[j], fi)
}
}
}
}
if !found {
// Just append if the directory is not in the
// overall results yet
dirs = append(dirs, bdirs[i])
fis = append(fis, bfis[i])
}
}
}
}
}
}
}
t.root.findPathBranches("/", createMappingPath(dir), recursive, treeVisitor)
// Add pseudo directories for mapping components which have no corresponding
// real directories
dirsMap := make(map[string]int)
for i, d := range dirs {
dirsMap[d] = i
}
t.root.findPathBranches("/", createMappingPath(dir), recursive,
func(item *treeItem, treePath string, branchPath []string, branches []string, writable []bool) {
if !strings.HasPrefix(treePath, dir) {
return
}
idx, ok := dirsMap[treePath]
if !ok {
// Create the entry if it does not exist
dirs = append(dirs, treePath)
idx = len(dirs) - 1
dirsMap[treePath] = idx
fis = append(fis, []os.FileInfo{})
}
// Add pseudo dirs if a physical directory is not present
for n := range item.children {
found := false
for _, fi := range fis[idx] {
if fi.Name() == n {
found = true
break
}
}
if found {
continue
}
if re.MatchString(n) {
// Append if it matches the pattern
fis[idx] = append(fis[idx], &FileInfo{
FiName: n,
FiSize: 0,
FiMode: os.FileMode(os.ModeDir | 0777),
FiModTime: time.Time{},
})
}
}
})
return dirs, fis, err
}
/*
Stat returns information about a given item. Use this function to find out
if a given path is a file or directory.
*/
func (t *Tree) Stat(item string) (os.FileInfo, error) {
dir, file := path.Split(item)
_, fis, err := t.Dir(dir, file, false, true)
if len(fis) == 1 {
for _, fi := range fis[0] {
if fi.Name() == file {
return fi, err
}
}
}
if err == nil {
err = &node.Error{
Type: node.ErrRemoteAction,
Detail: os.ErrNotExist.Error(),
IsNotExist: true,
}
}
return nil, err
}
/*
Copy is a general purpose copy function which creates files and directories.
Destination must be a directory. A non-existing destination
directory will be created.
*/
func (t *Tree) Copy(src []string, dst string,
updFunc func(file string, writtenBytes, totalBytes, currentFile, totalFiles int64)) error {
var err error
var relPaths []string
files := make(map[string]os.FileInfo) // Make sure any file is only copied once
paths := make(map[string]string)
// Add files to be copied to items
for _, s := range src {
var fi os.FileInfo
fi, err = t.Stat(s)
if fi, err = t.Stat(s); fi != nil {
if fi.IsDir() {
// Find all files inside directories
if dirs, fis, err := t.Dir(s, "", true, false); err == nil {
for i, d := range dirs {
for _, fi2 := range fis[i] {
if !fi2.IsDir() {
// Calculate the relative path by removing
// source path from the absolute path
relPath := path.Join(d, fi2.Name())[len(s):]
relPath = path.Join("/"+fi.Name(), relPath)
relPaths = append(relPaths, relPath)
files[relPath] = fi2
paths[relPath] = path.Join(d, fi2.Name())
}
}
}
}
} else {
// Single files are just added - these files will always
// be at the root of the destination
relPath := "/" + fi.Name()
relPaths = append(relPaths, relPath)
files[relPath] = fi
paths[relPath] = s
}
}
if err != nil {
err = fmt.Errorf("Cannot stat %v: %v", s, err.Error())
break
}
}
if err == nil {
var allFiles, cnt int64
// Copy all found files
allFiles = int64(len(files))
for _, k := range relPaths {
var totalSize, totalTransferred int64
cnt++
fi := files[k]
totalSize = fi.Size()
srcFile := paths[k]
err = t.CopyFile(srcFile, path.Join(dst, k), func(b int) {
if b >= 0 {
totalTransferred += int64(b)
updFunc(k, totalTransferred, totalSize, cnt, allFiles)
} else {
updFunc(k, int64(b), totalSize, cnt, allFiles)
}
})
if err != nil {
err = fmt.Errorf("Cannot copy %v to %v: %v", srcFile, dst, err.Error())
break
}
}
}
return err
}
/*
Sync operations
*/
const (
SyncCreateDirectory = "Create directory"
SyncCopyFile = "Copy file"
SyncRemoveDirectory = "Remove directory"
SyncRemoveFile = "Remove file"
)
/*
Sync a given destination with a given source directory. After this command has
finished the dstDir will have the same files and directories as the srcDir.
*/
func (t *Tree) Sync(srcDir string, dstDir string, recursive bool,
updFunc func(op, srcFile, dstFile string, writtenBytes, totalBytes, currentFile, totalFiles int64)) error {
var currentFile, totalFiles int64
t.treeLock.RLock()
defer t.treeLock.RUnlock()
// doSync syncs a given src directory
doSync := func(dir string, finfos []os.FileInfo) error {
sdir := path.Join(srcDir, dir)
ddir := path.Join(dstDir, dir)
// Query the corresponding destination to see what is there
_, dstFis, err := t.Dir(ddir, "", false, true)
if err == nil {
fileMap := make(map[string]string) // Map to quickly lookup destination files
dirMap := make(map[string]bool) // Map to quickly lookup destination directories
if len(dstFis) > 0 {
for _, fi := range dstFis[0] {
if fi.IsDir() {
dirMap[fi.Name()] = true
} else {
fileMap[fi.Name()] = fi.(*FileInfo).Checksum()
}
}
}
// Go through the given source file infos and see what needs to be copied
for _, fi := range finfos {
currentFile++
// Check if we have a directory or a file
if fi.IsDir() {
if _, ok := dirMap[fi.Name()]; !ok {
// Create all directories which aren't there
if updFunc != nil {
updFunc(SyncCreateDirectory, "", path.Join(ddir, fi.Name()), 0, 0, currentFile, totalFiles)
}
_, err = t.ItemOp(ddir, map[string]string{
ItemOpAction: ItemOpActMkDir,
ItemOpName: fi.Name(),
})
}
// Remove existing directories from the map so we can
// use the map to remove directories which shouldn't
// be there
delete(dirMap, fi.Name())
} else {
fsum, ok := fileMap[fi.Name()]
if !ok || fsum != fi.(*FileInfo).Checksum() {
var u func(b int)
s := path.Join(sdir, fi.Name())
d := path.Join(ddir, fi.Name())
// Copy the file if it does not exist or the checksum
// is not matching
if updFunc != nil {
var totalTransferred, totalSize int64
totalSize = fi.Size()
u = func(b int) {
if b >= 0 {
totalTransferred += int64(b)
updFunc(SyncCopyFile, s, d, totalTransferred, totalSize, currentFile, totalFiles)
} else {
updFunc(SyncCopyFile, s, d, int64(b), totalSize, currentFile, totalFiles)
}
}
}
if err = t.CopyFile(s, d, u); err != nil && updFunc != nil {
// Note at which point the error message was produced
updFunc(SyncCopyFile, s, d, 0, fi.Size(), currentFile, totalFiles)
}
}
// Remove existing files from the map so we can
// use the map to remove files which shouldn't
// be there
delete(fileMap, fi.Name())
}
if err != nil {
break
}
}
if err == nil {
// Remove files and directories which are in the destination but
// not in the source
for d := range dirMap {
if err == nil {
if updFunc != nil {
p := path.Join(ddir, d)
updFunc(SyncRemoveDirectory, "", p, 0, 0, currentFile, totalFiles)
}
_, err = t.ItemOp(ddir, map[string]string{
ItemOpAction: ItemOpActDelete,
ItemOpName: d,
})
}
}
for f := range fileMap {
if err == nil {
if updFunc != nil {
p := path.Join(ddir, f)
updFunc(SyncRemoveFile, "", p, 0, 0, currentFile, totalFiles)
}
_, err = t.ItemOp(ddir, map[string]string{
ItemOpAction: ItemOpActDelete,
ItemOpName: f,
})
}
}
}
}
return err
}
// We only query the source once otherwise we might end up in an
// endless loop if for example the dstDir is a subdirectory of srcDir
srcDirs, srcFis, err := t.Dir(srcDir, "", recursive, true)
if err == nil {
for _, fis := range srcFis {
totalFiles += int64(len(fis))
}
for i, dir := range srcDirs {
if err = doSync(relPath(dir, srcDir), srcFis[i]); err != nil {
break
}
}
}
return err
}
/*
CopyFile copies a given file using a simple io.Pipe.
*/
func (t *Tree) CopyFile(srcPath, dstPath string, updFunc func(writtenBytes int)) error {
var pw io.WriteCloser
var err, rerr error
t.treeLock.RLock()
defer t.treeLock.RUnlock()
// Use a pipe to stream the contents of the source file to the destination file
pr, pw := io.Pipe()
if updFunc != nil {
// Wrap the writer of the pipe
pw = &statusUpdatingWriter{pw, updFunc}
}
// Make sure the src exists
if _, rerr = t.ReadFile(srcPath, []byte{}, 0); rerr == nil {
// Read the source in a go routine
go func() {
rerr = t.ReadFileToBuffer(srcPath, pw)
pw.Close()
}()
// Write the destination file - this will return once the
// writer is closed
err = t.WriteFileFromBuffer(dstPath, pr)
}
if rerr != nil {
// Check if we got an empty file
if IsEOF(rerr) {
_, err = t.WriteFile(dstPath, nil, 0)
updFunc(0) // Report the creation of the empty file
rerr = nil
} else {
// Read errors are reported before write errors
err = rerr
}
}
pr.Close()
return err
}
/*
ReadFileToBuffer reads a complete file into a given buffer which implements
io.Writer.
*/
func (t *Tree) ReadFileToBuffer(spath string, buf io.Writer) error {
var n int
var err error
var offset int64
readBuf := make([]byte, DefaultReadBufferSize)
for err == nil {
n, err = t.ReadFile(spath, readBuf, offset)
if err == nil {
_, err = buf.Write(readBuf[:n])
offset += int64(n)
} else if IsEOF(err) {
// We reached the end of the file
err = nil
break
}
}
return err
}
/*
ReadFile reads up to len(p) bytes into p from the given offset. It
returns the number of bytes read (0 <= n <= len(p)) and any error
encountered.
*/
func (t *Tree) ReadFile(spath string, p []byte, offset int64) (int, error) {
var err error
var n int
var success bool
t.treeLock.RLock()
defer t.treeLock.RUnlock()
err = &node.Error{
Type: node.ErrRemoteAction,
Detail: os.ErrNotExist.Error(),
IsNotExist: true,
}
dir, file := path.Split(spath)
t.root.findPathBranches(dir, createMappingPath(dir), false,
func(item *treeItem, treePath string, branchPath []string, branches []string, writable []bool) {
for _, b := range branches {
if !success { // Only try other branches if we didn't have a success before
var res []byte
rpath := path.Join(branchPath...)
rpath = path.Join(rpath, file)
if res, err = t.client.SendData(b, map[string]string{
ParamAction: OpRead,
ParamPath: rpath,
ParamOffset: fmt.Sprint(offset),
ParamSize: fmt.Sprint(len(p)),
}, nil); err == nil {
var dest []interface{}
// Unpack the result
if err = gob.NewDecoder(bytes.NewBuffer(res)).Decode(&dest); err == nil {
n = dest[0].(int)
buf := dest[1].([]byte)
copy(p, buf)
}
}
success = err == nil
// Special case EOF
if IsEOF(err) {
success = true
}
}
}
})
return n, err
}
/*
WriteFileFromBuffer writes a complete file from a given buffer which implements
io.Reader.
*/
func (t *Tree) WriteFileFromBuffer(spath string, buf io.Reader) error {
var err error
var offset int64
writeBuf := make([]byte, DefaultReadBufferSize)
for err == nil {
var n int
if n, err = buf.Read(writeBuf); err == nil {
_, err = t.WriteFile(spath, writeBuf[:n], offset)
offset += int64(n)
} else if IsEOF(err) {
// We reached the end of the file
t.WriteFile(spath, []byte{}, offset)
err = nil
break
}
}
return err
}
/*
WriteFile writes p into the given file from the given offset. It
returns the number of written bytes and any error encountered.
*/
func (t *Tree) WriteFile(spath string, p []byte, offset int64) (int, error) {
var err error
var n, totalCount, ignoreCount int
t.treeLock.RLock()
defer t.treeLock.RUnlock()
dir, file := path.Split(spath)
t.root.findPathBranches(dir, createMappingPath(dir), false,
func(item *treeItem, treePath string, branchPath []string, branches []string, writable []bool) {
for i, b := range branches {
var res []byte
if err == nil {
totalCount++
if !writable[i] {
// Ignore all non-writable branches
ignoreCount++
continue
}
rpath := path.Join(branchPath...)
rpath = path.Join(rpath, file)
if res, err = t.client.SendData(b, map[string]string{
ParamAction: OpWrite,
ParamPath: rpath,
ParamOffset: fmt.Sprint(offset),
}, p); err == nil {
err = gob.NewDecoder(bytes.NewBuffer(res)).Decode(&n)
}
}
}
})
if err == nil && totalCount == ignoreCount {
err = fmt.Errorf("All applicable branches for the requested path were mounted as not writable")
}
return n, err
}
/*
ItemOp executes a file or directory specific operation which can either
succeed or fail (e.g. rename or delete). Actions and parameters should
be given in the opdata map.
*/
func (t *Tree) ItemOp(dir string, opdata map[string]string) (bool, error) {
var err error
var ret, recurse bool
var totalCount, ignoreCount, notFoundCount int
t.treeLock.RLock()
defer t.treeLock.RUnlock()
data := make(map[string]string)
for k, v := range opdata {
data[k] = v
}
data[ParamAction] = OpItemOp
// Check if we should recurse
if r, ok := data[ItemOpName]; ok {
recurse = strings.HasSuffix(r, "**")
}
t.root.findPathBranches(dir, createMappingPath(dir), recurse,
func(item *treeItem, treePath string, branchPath []string,
branches []string, writable []bool) {
for i, b := range branches {
var res []byte
totalCount++
if !writable[i] {
// Ignore all non-writable branches
ignoreCount++
continue
}
if err == nil {
data[ParamPath] = path.Join(branchPath...)
res, err = t.client.SendData(b, data, nil)
if rerr, ok := err.(*node.Error); ok && rerr.IsNotExist {
// Only count the not exist errors as this might only
// be true for some branches
notFoundCount++
err = nil
} else if err == nil {
var bres bool
// Execute the OpItem function
err = gob.NewDecoder(bytes.NewBuffer(res)).Decode(&bres)
ret = ret || bres // One positive result is enough
}
}
}
})
if totalCount == ignoreCount {
err = fmt.Errorf("All applicable branches for the requested path were mounted as not writable")
} else if totalCount == notFoundCount+ignoreCount {
err = &node.Error{
Type: node.ErrRemoteAction,
Detail: os.ErrNotExist.Error(),
IsNotExist: true,
}
}
return ret, err
}
// Util functions
// ==============
/*
createMappingPath properly splits a given path into a mapping path.
*/
func createMappingPath(path string) []string {
var ret []string
for _, i := range strings.Split(path, "/") {
if i == "" {
// Ignore empty child names
continue
}
ret = append(ret, i)
}
return ret
}
/*
DirResultToString formats a given Dir result into a human-readable string.
*/
func DirResultToString(paths []string, infos [][]os.FileInfo) string {
var buf bytes.Buffer
// Sort the paths
sort.Sort(&dirResult{paths, infos})
// Sort the FileInfos within the paths
for _, fis := range infos {
sort.Sort(fileInfoSlice(fis))
}
for i, p := range paths {
var maxlen int
fis := infos[i]
buf.WriteString(p)
buf.WriteString("\n")
sizeStrings := make([]string, 0, len(fis))
for _, fi := range fis {
sizeString := bitutil.ByteSizeString(fi.Size(), false)
if strings.HasSuffix(sizeString, " B") {
sizeString += " " // Unit should always be 3 runes
}
if l := utf8.RuneCountInString(sizeString); l > maxlen {
maxlen = l
}
sizeStrings = append(sizeStrings, sizeString)
}
for j, fi := range fis {
sizeString := sizeStrings[j]
sizePrefix := stringutil.GenerateRollingString(" ",
maxlen-utf8.RuneCountInString(sizeString))
if rfi, ok := fi.(*FileInfo); ok && rfi.FiChecksum != "" {
buf.WriteString(fmt.Sprintf("%v %v%v %v [%s]\n", fi.Mode(), sizePrefix,
sizeString, fi.Name(), rfi.Checksum()))
} else {
buf.WriteString(fmt.Sprintf("%v %v%v %v\n", fi.Mode(), sizePrefix,
sizeString, fi.Name()))
}
}
if i < len(paths)-1 {
buf.WriteString("\n")
}
}
return buf.String()
}
// Helper functions
// ================
// Helper function to normalise relative paths
/*
relPath create a normalized relative path by removing a given path prefix.
*/
func relPath(path, prefix string) string {
norm := func(path string) string {
if !strings.HasPrefix(path, "/") {
path = "/" + path
}
if strings.HasSuffix(path, "/") {
path = path[:len(path)-1]
}
return path
}
path = norm(path)
prefix = norm(prefix)
if strings.HasPrefix(path, prefix) {
path = path[len(prefix):]
if path == "" {
path = "/"
}
}
return path
}
// Helper objects to sort dir results
type dirResult struct {
paths []string
infos [][]os.FileInfo
}
func (r *dirResult) Len() int { return len(r.paths) }
func (r *dirResult) Less(i, j int) bool { return r.paths[i] < r.paths[j] }
func (r *dirResult) Swap(i, j int) {
r.paths[i], r.paths[j] = r.paths[j], r.paths[i]
r.infos[i], r.infos[j] = r.infos[j], r.infos[i]
}
type fileInfoSlice []os.FileInfo
func (f fileInfoSlice) Len() int { return len(f) }
func (f fileInfoSlice) Less(i, j int) bool { return f[i].Name() < f[j].Name() }
func (f fileInfoSlice) Swap(i, j int) { f[i], f[j] = f[j], f[i] }
// Helper object to given status updates when copying files
/*
statusUpdatingWriter is an internal io.WriteCloser which is used for status
updates.
*/
type statusUpdatingWriter struct {
io.WriteCloser
statusUpdate func(writtenBytes int)
}
/*
Write writes len(p) bytes from p to the writer.
*/
func (w *statusUpdatingWriter) Write(p []byte) (int, error) {
n, err := w.WriteCloser.Write(p)
w.statusUpdate(n)
return n, err
}
/*
Close closes the writer.
*/
func (w *statusUpdatingWriter) Close() error {
w.statusUpdate(-1)
return w.WriteCloser.Close()
}
/*
* Rufs - Remote Union File System
*
* Copyright 2017 Matthias Ladkau. All rights reserved.
*
* This Source Code Form is subject to the terms of the MIT
* License, If a copy of the MIT License was not distributed with this
* file, You can obtain one at https://opensource.org/licenses/MIT.
*/
package rufs
import (
"bytes"
"fmt"
"path"
"sort"
"devt.de/krotik/common/errorutil"
"devt.de/krotik/common/stringutil"
)
/*
treeItem models an item in the tree. This is an internal data structure
which is not exposed.
*/
type treeItem struct {
children map[string]*treeItem // Mapping from path component to branch
remoteBranches []string // List of remote branches which are present on this level
remoteBranchWriting []bool // Flag if the remote branch should receive write requests
}
/*
findPathBranches finds all relevant branches for a single path. The iterator
function receives 4 parameters: The tree item, the total path within the tree,
the subpath within the branch and a list of all branches for the tree path.
Calling code should always give a treePath of "/".
*/
func (t *treeItem) findPathBranches(treePath string, branchPath []string,
recursive bool, visit func(*treeItem, string, []string, []string, []bool)) {
visit(t, treePath, branchPath, t.remoteBranches, t.remoteBranchWriting)
if len(branchPath) > 0 {
if c, ok := t.children[branchPath[0]]; ok {
// Check if a subpath matches
c.findPathBranches(path.Join(treePath, branchPath[0]),
branchPath[1:], recursive, visit)
}
} else if recursive {
var childNames []string
for n := range t.children {
childNames = append(childNames, n)
}
sort.Strings(childNames)
for _, n := range childNames {
t.children[n].findPathBranches(path.Join(treePath, n),
branchPath, recursive, visit)
}
}
}
/*
addMapping adds a new mapping.
*/
func (t *treeItem) addMapping(mappingPath []string, branchName string, writable bool) {
// Add mapping to a child
if len(mappingPath) > 0 {
childName := mappingPath[0]
rest := mappingPath[1:]
errorutil.AssertTrue(childName != "",
"Adding a mapping with an empty path is not supported")
// Ensure child exists
child, ok := t.children[childName]
if !ok {
child = &treeItem{make(map[string]*treeItem), []string{}, []bool{}}
t.children[childName] = child
}
// Add rest of the mapping to the child
child.addMapping(rest, branchName, writable)
return
}
// Add branch name to this branch - keep the order in which the branches were added
t.remoteBranches = append(t.remoteBranches, branchName)
t.remoteBranchWriting = append(t.remoteBranchWriting, writable)
}
/*
String returns a string representation of this item and its children.
*/
func (t *treeItem) String(indent int, buf *bytes.Buffer) {
for i, b := range t.remoteBranches {
buf.WriteString(b)
if t.remoteBranchWriting[i] {
buf.WriteString("(w)")
} else {
buf.WriteString("(r)")
}
if i < len(t.remoteBranches)-1 {
buf.WriteString(", ")
}
}
buf.WriteString("\n")
names := make([]string, 0, len(t.children))
for n := range t.children {
names = append(names, n)
}
sort.Strings(names)
for _, n := range names {
i := t.children[n]
buf.WriteString(stringutil.GenerateRollingString(" ", indent*2))
buf.WriteString(fmt.Sprintf("%v/: ", n))
i.String(indent+1, buf)
}
}
/*
* Rufs - Remote Union File System
*
* Copyright 2017 Matthias Ladkau. All rights reserved.
*
* This Source Code Form is subject to the terms of the MIT
* License, If a copy of the MIT License was not distributed with this
* file, You can obtain one at https://opensource.org/licenses/MIT.
*/
package rufs
import (
"io"
"devt.de/krotik/rufs/node"
)
/*
IsEOF tests if the given error is an EOF error.
*/
func IsEOF(err error) bool {
if err == io.EOF {
return true
}
if rerr, ok := err.(*node.Error); ok {
return rerr.Detail == io.EOF.Error()
}
return false
}